我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用sqlalchemy.sql.ClauseElement()。
def get_or_create(session, model, defaults=None, **kwargs): """ :param Session session: :param model: :param defaults: :param kwargs: :return: """ instance = session.query(model).filter_by(**kwargs).first() if instance: return instance, False else: params = dict((k, v) for k, v in kwargs.items() if not isinstance(v, ClauseElement)) params.update(defaults or {}) instance = model(**params) session.add(instance) return instance, True
def _render_potential_expr(value, autogen_context, wrap_in_text=True): if isinstance(value, sql.ClauseElement): if compat.sqla_08: compile_kw = dict(compile_kwargs={ 'literal_binds': True, "include_table": False}) else: compile_kw = {} if wrap_in_text: template = "%(prefix)stext(%(sql)r)" else: template = "%(sql)r" return template % { "prefix": _sqlalchemy_autogenerate_prefix(autogen_context), "sql": compat.text_type( value.compile(dialect=autogen_context.dialect, **compile_kw) ) } else: return repr(value)
def __init__(self, engine, statement, parameters=None, typemap=None, **kwargs): """constructs a new ANSICompiler object. engine - SQLEngine to compile against statement - ClauseElement to be compiled parameters - optional dictionary indicating a set of bind parameters specified with this Compiled object. These parameters are the "default" key/value pairs when the Compiled is executed, and also may affect the actual compilation, as in the case of an INSERT where the actual columns inserted will correspond to the keys present in the parameters.""" sql.Compiled.__init__(self, engine, statement, parameters) self.binds = {} self.froms = {} self.wheres = {} self.strings = {} self.select_stack = [] self.typemap = typemap or {} self.isinsert = False
def visit_update(self, update_stmt): colparams = self._get_colparams(update_stmt) def create_param(p): if isinstance(p, sql.BindParamClause): self.binds[p.key] = p self.binds[p.shortname] = p return self.bindparam_string(p.key) else: p.accept_visitor(self) if isinstance(p, sql.ClauseElement): return "(" + self.get_str(p) + ")" else: return self.get_str(p) text = "UPDATE " + update_stmt.table.fullname + " SET " + string.join(["%s=%s" % (c[0].name, create_param(c[1])) for c in colparams], ', ') if update_stmt.whereclause: text += " WHERE " + self.get_str(update_stmt.whereclause) self.strings[update_stmt] = text
def _render_potential_expr(value, autogen_context): if isinstance(value, sql.ClauseElement): if compat.sqla_08: compile_kw = dict(compile_kwargs={'literal_binds': True}) else: compile_kw = {} return "%(prefix)stext(%(sql)r)" % { "prefix": _sqlalchemy_autogenerate_prefix(autogen_context), "sql": str( value.compile(dialect=autogen_context['dialect'], **compile_kw) ) } else: return repr(value)
def _render_potential_expr(value, autogen_context, wrap_in_text=True): if isinstance(value, sql.ClauseElement): if util.sqla_08: compile_kw = dict(compile_kwargs={ 'literal_binds': True, "include_table": False}) else: compile_kw = {} if wrap_in_text: template = "%(prefix)stext(%(sql)r)" else: template = "%(sql)r" return template % { "prefix": _sqlalchemy_autogenerate_prefix(autogen_context), "sql": compat.text_type( value.compile(dialect=autogen_context.dialect, **compile_kw) ) } else: return repr(value)
def _render_potential_expr(value, autogen_context, wrap_in_text=True): if isinstance(value, sql.ClauseElement): if compat.sqla_08: compile_kw = dict(compile_kwargs={'literal_binds': True}) else: compile_kw = {} if wrap_in_text: template = "%(prefix)stext(%(sql)r)" else: template = "%(sql)r" return template % { "prefix": _sqlalchemy_autogenerate_prefix(autogen_context), "sql": compat.text_type( value.compile(dialect=autogen_context['dialect'], **compile_kw) ) } else: return repr(value)
def update(self, **values): cls = type(self._instance) for key, value in values.items(): prop = cls.__dict__.get(key) if isinstance(prop, json_support.JSONProperty): value_from = '__profile__' method = self._set_prop k = prop else: value_from = '__values__' method = self._set k = key if not isinstance(value, ClauseElement): setattr(self._instance, key, value) value = getattr(self._instance, value_from)[key] method(k, value) return self
def visit_column_default(self, default): if isinstance(default.arg, sql.ClauseElement): return self.exec_default_sql(default) elif callable(default.arg): return default.arg() else: return default.arg
def compiler(self, statement, parameters): """returns a sql.ClauseVisitor which will produce a string representation of the given ClauseElement and parameter dictionary. This object is usually a subclass of ansisql.ANSICompiler. compiler is called within the context of the compile() method.""" raise NotImplementedError()
def compile(self, statement, parameters, **kwargs): """given a sql.ClauseElement statement plus optional bind parameters, creates a new instance of this engine's SQLCompiler, compiles the ClauseElement, and returns the newly compiled object.""" compiler = self.compiler(statement, parameters, **kwargs) statement.accept_visitor(compiler) compiler.after_compile() return compiler
def compile_query(query, dialect=_dialect, inline=False): if isinstance(query, str): query_logger.debug(query) return query, () elif isinstance(query, ClauseElement): query = execute_defaults(query) # default values for Insert/Update compiled = query.compile(dialect=dialect) params = _get_keys(compiled) new_query, new_params = _replace_keys(compiled.string, params) query_logger.debug(new_query) if inline: return new_query return new_query, new_params
def _set_prop(self, prop, value): if isinstance(value, ClauseElement): self._literal = False self._props[prop] = value
def get_or_create(session, model, defaults=None, **kwargs): instance = session.query(model).filter_by(**kwargs).first() if instance: return instance, False else: params = dict((k, v) for k, v in kwargs.iteritems() if not isinstance(v, ClauseElement)) params.update(defaults or {}) instance = model(**params) session.add(instance) return instance, True
def get_expressions_or_columns(self): expr_columns = util.zip_longest(self.expressions, self.columns) return [ (expr if isinstance(expr, ClauseElement) else colexpr) for expr, colexpr in expr_columns ]
def apply(self, bind=None, timeout=DEFAULT): cls = type(self._instance) values = self._values.copy() # handle JSON columns json_updates = {} for prop, value in self._props.items(): value = prop.save(self._instance, value) updates = json_updates.setdefault(prop.column_name, {}) if self._literal: updates[prop.name] = value else: if isinstance(value, int): value = sa.cast(value, sa.BigInteger) elif not isinstance(value, ClauseElement): value = sa.cast(value, sa.Unicode) updates[sa.cast(prop.name, sa.Unicode)] = value for column_name, updates in json_updates.items(): column = getattr(cls, column_name) if self._literal: values[column_name] = column.concat(updates) else: if isinstance(column.type, sa_pg.JSONB): func = sa.func.jsonb_build_object else: func = sa.func.json_build_object values[column_name] = column.concat( func(*itertools.chain(*updates.items()))) opts = dict(return_model=False) if timeout is not DEFAULT: opts['timeout'] = timeout clause = self._clause.values( **values, ).returning( *[getattr(cls, key) for key in values], ).execution_options(**opts) row = await cls.__metadata__.first(clause, bind=bind) if not row: raise NoSuchRowError() self._instance.__values__.update(row) for prop in self._props: prop.reload(self._instance) return self