Python sqlalchemy.util 模块,to_list() 实例源码

我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用sqlalchemy.util.to_list()

项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def only_on(dbs, reason=None):
    return only_if(
        OrPredicate([SpecPredicate(db) for db in util.to_list(dbs)])
    )
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def _compile(self, whereclause = None, **kwargs):
        order_by = kwargs.pop('order_by', False)
        if order_by is False:
            order_by = self.order_by
        if order_by is False:
            if self.table.default_order_by() is not None:
                order_by = self.table.default_order_by()

        if self._should_nest(**kwargs):
            s2 = sql.select(self.table.primary_key, whereclause, use_labels=True, **kwargs)
            if not kwargs.get('distinct', False) and order_by:
                s2.order_by(*util.to_list(order_by))
            s3 = s2.alias('rowcount')
            crit = []
            for i in range(0, len(self.table.primary_key)):
                crit.append(s3.primary_key[i] == self.table.primary_key[i])
            statement = sql.select([], sql.and_(*crit), from_obj=[self.table], use_labels=True)
            if order_by:
                statement.order_by(*util.to_list(order_by))
        else:
            statement = sql.select([], whereclause, from_obj=[self.table], use_labels=True, **kwargs)
            if order_by:
                statement.order_by(*util.to_list(order_by))
            # for a DISTINCT query, you need the columns explicitly specified in order
            # to use it in "order_by".  insure they are in the column criterion (particularly oid).
            # TODO: this should be done at the SQL level not the mapper level
            if kwargs.get('distinct', False) and order_by:
                statement.append_column(*util.to_list(order_by))
        # plugin point


        # give all the attached properties a chance to modify the query
        for key, value in self.props.iteritems():
            value.setup(key, statement, **kwargs) 
        return statement
项目:annotated-py-sqlalchemy    作者:hhstore    | 项目源码 | 文件源码
def setup(self, key, statement, recursion_stack = None, eagertable=None, **options):
        """add a left outer join to the statement thats being constructed"""

        if recursion_stack is None:
            recursion_stack = {}

        if statement.whereclause is not None:
            # "aliasize" the tables referenced in the user-defined whereclause to not 
            # collide with the tables used by the eager load
            # note that we arent affecting the mapper's table, nor our own primary or secondary joins
            aliasizer = Aliasizer(*self.to_alias)
            statement.whereclause.accept_visitor(aliasizer)
            for alias in aliasizer.aliases.values():
                statement.append_from(alias)

        if hasattr(statement, '_outerjoin'):
            towrap = statement._outerjoin
        else:
            towrap = self.parent.table

        if self.secondaryjoin is not None:
            statement._outerjoin = sql.outerjoin(towrap, self.secondary, self.eagerprimary).outerjoin(self.eagertarget, self.eagersecondary)
            if self.order_by is False and self.secondary.default_order_by() is not None:
                statement.order_by(*self.secondary.default_order_by())
        else:
            statement._outerjoin = towrap.outerjoin(self.eagertarget, self.eagerprimary)
            if self.order_by is False and self.eagertarget.default_order_by() is not None:
                statement.order_by(*self.eagertarget.default_order_by())

        if self.eager_order_by:
            statement.order_by(*util.to_list(self.eager_order_by))

        statement.append_from(statement._outerjoin)
        recursion_stack[self] = True
        try:
            for key, value in self.mapper.props.iteritems():
                if recursion_stack.has_key(value):
                    raise "Circular eager load relationship detected on " + str(self.mapper) + " " + key + repr(self.mapper.props)
                value.setup(key, statement, recursion_stack=recursion_stack, eagertable=self.eagertarget)
        finally:
            del recursion_stack[self]
项目:modelchemy    作者:rjusher    | 项目源码 | 文件源码
def get(self, ident, **kwargs):
        if self._shard_id is not None:
            return super(ShardedQuery, self).get(ident)
        else:
            ident = util.to_list(ident)
            for shard_id in self.id_chooser(self, ident):
                o = self.set_shard(shard_id).get(ident, **kwargs)
                if o is not None:
                    return o
            else:
                return None
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def only_on(dbs, reason=None):
    return only_if(
        OrPredicate([SpecPredicate(db) for db in util.to_list(dbs)])
    )
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def only_on(dbs, reason=None):
    return only_if(
        OrPredicate([Predicate.as_predicate(db) for db in util.to_list(dbs)])
    )
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def only_on(dbs, reason=None):
    return only_if(
        OrPredicate([Predicate.as_predicate(db) for db in util.to_list(dbs)])
    )
项目:Callandtext    作者:iaora    | 项目源码 | 文件源码
def only_on(dbs, reason=None):
    return only_if(
        OrPredicate([SpecPredicate(db) for db in util.to_list(dbs)])
    )
项目:flask-maple    作者:honmaple    | 项目源码 | 文件源码
def _filter_or_exclude(self, negate, kwargs):
        q = self

        def negate_if(expr):
            return expr if not negate else ~expr

        column = None

        for arg, value in kwargs.items():
            for token in arg.split('__'):
                if column is None:
                    column = _entity_descriptor(q._joinpoint_zero(), token)
                    if column.impl.uses_objects:
                        q = q.join(column)
                        column = None
                elif token in ['in']:
                    op = self._underscore_operators[token]
                    q = q.filter(negate_if(op(column, value)))
                    column = None
                elif token in self._underscore_operators:
                    op = self._underscore_operators[token]
                    q = q.filter(negate_if(op(column, *to_list(value))))
                    column = None
                else:
                    raise ValueError('No idea what to do with %r' % token)
            if column is not None:
                q = q.filter(negate_if(column == value))
                column = None
            q = q.reset_joinpoint()
        return q
项目:webapp    作者:superchilli    | 项目源码 | 文件源码
def only_on(dbs, reason=None):
    return only_if(
        OrPredicate([Predicate.as_predicate(db) for db in util.to_list(dbs)])
    )
项目:maple-file    作者:honmaple    | 项目源码 | 文件源码
def _filter_or_exclude(self, negate, kwargs):
        q = self

        def negate_if(expr):
            return expr if not negate else ~expr

        column = None

        for arg, value in kwargs.items():
            for token in arg.split('__'):
                if column is None:
                    column = _entity_descriptor(q._joinpoint_zero(), token)
                    if column.impl.uses_objects:
                        q = q.join(column)
                        column = None
                elif token in ['in']:
                    op = self._underscore_operators[token]
                    q = q.filter(negate_if(op(column, value)))
                    column = None
                elif token in self._underscore_operators:
                    op = self._underscore_operators[token]
                    q = q.filter(negate_if(op(column, *to_list(value))))
                    column = None
                else:
                    raise ValueError('No idea what to do with %r' % token)
            if column is not None:
                q = q.filter(negate_if(column == value))
                column = None
            q = q.reset_joinpoint()
        return q
项目:QualquerMerdaAPI    作者:tiagovizoto    | 项目源码 | 文件源码
def only_on(dbs, reason=None):
    return only_if(
        OrPredicate([Predicate.as_predicate(db) for db in util.to_list(dbs)])
    )
项目:gardenbot    作者:GoestaO    | 项目源码 | 文件源码
def only_on(dbs, reason=None):
    return only_if(
        OrPredicate([Predicate.as_predicate(db) for db in util.to_list(dbs)])
    )
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def only_on(dbs, reason=None):
    return only_if(
        OrPredicate([Predicate.as_predicate(db) for db in util.to_list(dbs)])
    )
项目:ngx_status    作者:YoYoAdorkable    | 项目源码 | 文件源码
def only_on(dbs, reason=None):
    return only_if(
        OrPredicate([Predicate.as_predicate(db) for db in util.to_list(dbs)])
    )
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def _possible_configs_for_cls(cls, reasons=None):
    all_configs = set(config.Config.all_configs())
    if cls.__unsupported_on__:
        spec = exclusions.db_spec(*cls.__unsupported_on__)
        for config_obj in list(all_configs):
            if spec(config_obj):
                all_configs.remove(config_obj)
    if getattr(cls, '__only_on__', None):
        spec = exclusions.db_spec(*util.to_list(cls.__only_on__))
        for config_obj in list(all_configs):
            if not spec(config_obj):
                all_configs.remove(config_obj)

    if hasattr(cls, '__requires__'):
        requirements = config.requirements
        for config_obj in list(all_configs):
            for requirement in cls.__requires__:
                check = getattr(requirements, requirement)

                skip_reasons = check.matching_config_reasons(config_obj)
                if skip_reasons:
                    all_configs.remove(config_obj)
                    if reasons is not None:
                        reasons.extend(skip_reasons)
                    break

    if hasattr(cls, '__prefer_requires__'):
        non_preferred = set()
        requirements = config.requirements
        for config_obj in list(all_configs):
            for requirement in cls.__prefer_requires__:
                check = getattr(requirements, requirement)

                if not check.enabled_for_config(config_obj):
                    non_preferred.add(config_obj)
        if all_configs.difference(non_preferred):
            all_configs.difference_update(non_preferred)

    for db_spec, op, spec in getattr(cls, '__excluded_on__', ()):
        for config_obj in list(all_configs):
            if not exclusions.skip_if(
                    exclusions.SpecPredicate(db_spec, op, spec)
            ).enabled_for_config(config_obj):
                all_configs.remove(config_obj)

    return all_configs
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def _do_skips(cls):
    reasons = []
    all_configs = _possible_configs_for_cls(cls, reasons)

    if getattr(cls, '__skip_if__', False):
        for c in getattr(cls, '__skip_if__'):
            if c():
                raise SkipTest("'%s' skipped by %s" % (
                    cls.__name__, c.__name__)
                )

    if not all_configs:
        if getattr(cls, '__backend__', False):
            msg = "'%s' unsupported for implementation '%s'" % (
                cls.__name__, cls.__only_on__)
        else:
            msg = "'%s' unsupported on any DB implementation %s%s" % (
                cls.__name__,
                ", ".join(
                    "'%s(%s)+%s'" % (
                        config_obj.db.name,
                        ".".join(
                            str(dig) for dig in
                            config_obj.db.dialect.server_version_info),
                        config_obj.db.driver
                    )
                  for config_obj in config.Config.all_configs()
                ),
                ", ".join(reasons)
            )
        raise SkipTest(msg)
    elif hasattr(cls, '__prefer_backends__'):
        non_preferred = set()
        spec = exclusions.db_spec(*util.to_list(cls.__prefer_backends__))
        for config_obj in all_configs:
            if not spec(config_obj):
                non_preferred.add(config_obj)
        if all_configs.difference(non_preferred):
            all_configs.difference_update(non_preferred)

    if config._current not in all_configs:
        _setup_config(all_configs.pop(), cls)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _possible_configs_for_cls(cls, reasons=None):
    all_configs = set(config.Config.all_configs())
    if cls.__unsupported_on__:
        spec = exclusions.db_spec(*cls.__unsupported_on__)
        for config_obj in list(all_configs):
            if spec(config_obj):
                all_configs.remove(config_obj)
    if getattr(cls, '__only_on__', None):
        spec = exclusions.db_spec(*util.to_list(cls.__only_on__))
        for config_obj in list(all_configs):
            if not spec(config_obj):
                all_configs.remove(config_obj)

    if hasattr(cls, '__requires__'):
        requirements = config.requirements
        for config_obj in list(all_configs):
            for requirement in cls.__requires__:
                check = getattr(requirements, requirement)

                skip_reasons = check.matching_config_reasons(config_obj)
                if skip_reasons:
                    all_configs.remove(config_obj)
                    if reasons is not None:
                        reasons.extend(skip_reasons)
                    break

    if hasattr(cls, '__prefer_requires__'):
        non_preferred = set()
        requirements = config.requirements
        for config_obj in list(all_configs):
            for requirement in cls.__prefer_requires__:
                check = getattr(requirements, requirement)

                if not check.enabled_for_config(config_obj):
                    non_preferred.add(config_obj)
        if all_configs.difference(non_preferred):
            all_configs.difference_update(non_preferred)

    for db_spec, op, spec in getattr(cls, '__excluded_on__', ()):
        for config_obj in list(all_configs):
            if not exclusions.skip_if(
                    exclusions.SpecPredicate(db_spec, op, spec)
            ).enabled_for_config(config_obj):
                all_configs.remove(config_obj)

    return all_configs
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _do_skips(cls):
    reasons = []
    all_configs = _possible_configs_for_cls(cls, reasons)

    if getattr(cls, '__skip_if__', False):
        for c in getattr(cls, '__skip_if__'):
            if c():
                raise SkipTest("'%s' skipped by %s" % (
                    cls.__name__, c.__name__)
                )

    if not all_configs:
        if getattr(cls, '__backend__', False):
            msg = "'%s' unsupported for implementation '%s'" % (
                cls.__name__, cls.__only_on__)
        else:
            msg = "'%s' unsupported on any DB implementation %s%s" % (
                cls.__name__,
                ", ".join(
                    "'%s(%s)+%s'" % (
                        config_obj.db.name,
                        ".".join(
                            str(dig) for dig in
                            config_obj.db.dialect.server_version_info),
                        config_obj.db.driver
                    )
                  for config_obj in config.Config.all_configs()
                ),
                ", ".join(reasons)
            )
        raise SkipTest(msg)
    elif hasattr(cls, '__prefer_backends__'):
        non_preferred = set()
        spec = exclusions.db_spec(*util.to_list(cls.__prefer_backends__))
        for config_obj in all_configs:
            if not spec(config_obj):
                non_preferred.add(config_obj)
        if all_configs.difference(non_preferred):
            all_configs.difference_update(non_preferred)

    if config._current not in all_configs:
        _setup_config(all_configs.pop(), cls)
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def _do_skips(self, cls):
        from sqlalchemy.testing import config
        if hasattr(cls, '__requires__'):
            def test_suite():
                return 'ok'
            test_suite.__name__ = cls.__name__
            for requirement in cls.__requires__:
                check = getattr(config.requirements, requirement)

                if not check.enabled:
                    raise SkipTest(
                        check.reason if check.reason
                        else
                        (
                            "'%s' unsupported on DB implementation '%s'" % (
                                cls.__name__, config.db.name
                            )
                        )
                    )

        if cls.__unsupported_on__:
            spec = exclusions.db_spec(*cls.__unsupported_on__)
            if spec(config.db):
                raise SkipTest(
                    "'%s' unsupported on DB implementation '%s'" % (
                     cls.__name__, config.db.name)
                    )

        if getattr(cls, '__only_on__', None):
            spec = exclusions.db_spec(*util.to_list(cls.__only_on__))
            if not spec(config.db):
                raise SkipTest(
                    "'%s' unsupported on DB implementation '%s'" % (
                     cls.__name__, config.db.name)
                    )

        if getattr(cls, '__skip_if__', False):
            for c in getattr(cls, '__skip_if__'):
                if c():
                    raise SkipTest("'%s' skipped by %s" % (
                        cls.__name__, c.__name__)
                    )

        for db, op, spec in getattr(cls, '__excluded_on__', ()):
            exclusions.exclude(db, op, spec,
                    "'%s' unsupported on DB %s version %s" % (
                    cls.__name__, config.db.name,
                    exclusions._server_version(config.db)))