我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用sqlalchemy.util.to_list()。
def only_on(dbs, reason=None): return only_if( OrPredicate([SpecPredicate(db) for db in util.to_list(dbs)]) )
def _compile(self, whereclause = None, **kwargs): order_by = kwargs.pop('order_by', False) if order_by is False: order_by = self.order_by if order_by is False: if self.table.default_order_by() is not None: order_by = self.table.default_order_by() if self._should_nest(**kwargs): s2 = sql.select(self.table.primary_key, whereclause, use_labels=True, **kwargs) if not kwargs.get('distinct', False) and order_by: s2.order_by(*util.to_list(order_by)) s3 = s2.alias('rowcount') crit = [] for i in range(0, len(self.table.primary_key)): crit.append(s3.primary_key[i] == self.table.primary_key[i]) statement = sql.select([], sql.and_(*crit), from_obj=[self.table], use_labels=True) if order_by: statement.order_by(*util.to_list(order_by)) else: statement = sql.select([], whereclause, from_obj=[self.table], use_labels=True, **kwargs) if order_by: statement.order_by(*util.to_list(order_by)) # for a DISTINCT query, you need the columns explicitly specified in order # to use it in "order_by". insure they are in the column criterion (particularly oid). # TODO: this should be done at the SQL level not the mapper level if kwargs.get('distinct', False) and order_by: statement.append_column(*util.to_list(order_by)) # plugin point # give all the attached properties a chance to modify the query for key, value in self.props.iteritems(): value.setup(key, statement, **kwargs) return statement
def setup(self, key, statement, recursion_stack = None, eagertable=None, **options): """add a left outer join to the statement thats being constructed""" if recursion_stack is None: recursion_stack = {} if statement.whereclause is not None: # "aliasize" the tables referenced in the user-defined whereclause to not # collide with the tables used by the eager load # note that we arent affecting the mapper's table, nor our own primary or secondary joins aliasizer = Aliasizer(*self.to_alias) statement.whereclause.accept_visitor(aliasizer) for alias in aliasizer.aliases.values(): statement.append_from(alias) if hasattr(statement, '_outerjoin'): towrap = statement._outerjoin else: towrap = self.parent.table if self.secondaryjoin is not None: statement._outerjoin = sql.outerjoin(towrap, self.secondary, self.eagerprimary).outerjoin(self.eagertarget, self.eagersecondary) if self.order_by is False and self.secondary.default_order_by() is not None: statement.order_by(*self.secondary.default_order_by()) else: statement._outerjoin = towrap.outerjoin(self.eagertarget, self.eagerprimary) if self.order_by is False and self.eagertarget.default_order_by() is not None: statement.order_by(*self.eagertarget.default_order_by()) if self.eager_order_by: statement.order_by(*util.to_list(self.eager_order_by)) statement.append_from(statement._outerjoin) recursion_stack[self] = True try: for key, value in self.mapper.props.iteritems(): if recursion_stack.has_key(value): raise "Circular eager load relationship detected on " + str(self.mapper) + " " + key + repr(self.mapper.props) value.setup(key, statement, recursion_stack=recursion_stack, eagertable=self.eagertarget) finally: del recursion_stack[self]
def get(self, ident, **kwargs): if self._shard_id is not None: return super(ShardedQuery, self).get(ident) else: ident = util.to_list(ident) for shard_id in self.id_chooser(self, ident): o = self.set_shard(shard_id).get(ident, **kwargs) if o is not None: return o else: return None
def only_on(dbs, reason=None): return only_if( OrPredicate([Predicate.as_predicate(db) for db in util.to_list(dbs)]) )
def _filter_or_exclude(self, negate, kwargs): q = self def negate_if(expr): return expr if not negate else ~expr column = None for arg, value in kwargs.items(): for token in arg.split('__'): if column is None: column = _entity_descriptor(q._joinpoint_zero(), token) if column.impl.uses_objects: q = q.join(column) column = None elif token in ['in']: op = self._underscore_operators[token] q = q.filter(negate_if(op(column, value))) column = None elif token in self._underscore_operators: op = self._underscore_operators[token] q = q.filter(negate_if(op(column, *to_list(value)))) column = None else: raise ValueError('No idea what to do with %r' % token) if column is not None: q = q.filter(negate_if(column == value)) column = None q = q.reset_joinpoint() return q
def _possible_configs_for_cls(cls, reasons=None): all_configs = set(config.Config.all_configs()) if cls.__unsupported_on__: spec = exclusions.db_spec(*cls.__unsupported_on__) for config_obj in list(all_configs): if spec(config_obj): all_configs.remove(config_obj) if getattr(cls, '__only_on__', None): spec = exclusions.db_spec(*util.to_list(cls.__only_on__)) for config_obj in list(all_configs): if not spec(config_obj): all_configs.remove(config_obj) if hasattr(cls, '__requires__'): requirements = config.requirements for config_obj in list(all_configs): for requirement in cls.__requires__: check = getattr(requirements, requirement) skip_reasons = check.matching_config_reasons(config_obj) if skip_reasons: all_configs.remove(config_obj) if reasons is not None: reasons.extend(skip_reasons) break if hasattr(cls, '__prefer_requires__'): non_preferred = set() requirements = config.requirements for config_obj in list(all_configs): for requirement in cls.__prefer_requires__: check = getattr(requirements, requirement) if not check.enabled_for_config(config_obj): non_preferred.add(config_obj) if all_configs.difference(non_preferred): all_configs.difference_update(non_preferred) for db_spec, op, spec in getattr(cls, '__excluded_on__', ()): for config_obj in list(all_configs): if not exclusions.skip_if( exclusions.SpecPredicate(db_spec, op, spec) ).enabled_for_config(config_obj): all_configs.remove(config_obj) return all_configs
def _do_skips(cls): reasons = [] all_configs = _possible_configs_for_cls(cls, reasons) if getattr(cls, '__skip_if__', False): for c in getattr(cls, '__skip_if__'): if c(): raise SkipTest("'%s' skipped by %s" % ( cls.__name__, c.__name__) ) if not all_configs: if getattr(cls, '__backend__', False): msg = "'%s' unsupported for implementation '%s'" % ( cls.__name__, cls.__only_on__) else: msg = "'%s' unsupported on any DB implementation %s%s" % ( cls.__name__, ", ".join( "'%s(%s)+%s'" % ( config_obj.db.name, ".".join( str(dig) for dig in config_obj.db.dialect.server_version_info), config_obj.db.driver ) for config_obj in config.Config.all_configs() ), ", ".join(reasons) ) raise SkipTest(msg) elif hasattr(cls, '__prefer_backends__'): non_preferred = set() spec = exclusions.db_spec(*util.to_list(cls.__prefer_backends__)) for config_obj in all_configs: if not spec(config_obj): non_preferred.add(config_obj) if all_configs.difference(non_preferred): all_configs.difference_update(non_preferred) if config._current not in all_configs: _setup_config(all_configs.pop(), cls)
def _do_skips(self, cls): from sqlalchemy.testing import config if hasattr(cls, '__requires__'): def test_suite(): return 'ok' test_suite.__name__ = cls.__name__ for requirement in cls.__requires__: check = getattr(config.requirements, requirement) if not check.enabled: raise SkipTest( check.reason if check.reason else ( "'%s' unsupported on DB implementation '%s'" % ( cls.__name__, config.db.name ) ) ) if cls.__unsupported_on__: spec = exclusions.db_spec(*cls.__unsupported_on__) if spec(config.db): raise SkipTest( "'%s' unsupported on DB implementation '%s'" % ( cls.__name__, config.db.name) ) if getattr(cls, '__only_on__', None): spec = exclusions.db_spec(*util.to_list(cls.__only_on__)) if not spec(config.db): raise SkipTest( "'%s' unsupported on DB implementation '%s'" % ( cls.__name__, config.db.name) ) if getattr(cls, '__skip_if__', False): for c in getattr(cls, '__skip_if__'): if c(): raise SkipTest("'%s' skipped by %s" % ( cls.__name__, c.__name__) ) for db, op, spec in getattr(cls, '__excluded_on__', ()): exclusions.exclude(db, op, spec, "'%s' unsupported on DB %s version %s" % ( cls.__name__, config.db.name, exclusions._server_version(config.db)))