Python sqlalchemy 模块,inspect() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.inspect()

项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def __init__(self, entity, equivalents=None, adapt_required=False,
                 chain_to=None, allow_label_resolve=True,
                 anonymize_labels=False):
        info = inspection.inspect(entity)

        self.mapper = info.mapper
        selectable = info.selectable
        is_aliased_class = info.is_aliased_class
        if is_aliased_class:
            self.aliased_class = entity
        else:
            self.aliased_class = None

        sql_util.ColumnAdapter.__init__(
            self, selectable, equivalents, chain_to,
            adapt_required=adapt_required,
            allow_label_resolve=allow_label_resolve,
            anonymize_labels=anonymize_labels,
            include_fn=self._include_fn
        )
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _test_get_table_names(self, schema=None, table_type='table',
                              order_by=None):
        meta = self.metadata
        users, addresses, dingalings = self.tables.users, \
            self.tables.email_addresses, self.tables.dingalings
        insp = inspect(meta.bind)

        if table_type == 'view':
            table_names = insp.get_view_names(schema)
            table_names.sort()
            answer = ['email_addresses_v', 'users_v']
            eq_(sorted(table_names), answer)
        else:
            table_names = insp.get_table_names(schema,
                                               order_by=order_by)
            if order_by == 'foreign_key':
                answer = ['users', 'email_addresses', 'dingalings']
                eq_(table_names, answer)
            else:
                answer = ['dingalings', 'email_addresses', 'users']
                eq_(sorted(table_names), answer)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def force_drop_names(*names):
    """Force the given table names to be dropped after test complete,
    isolating for foreign key cycles

    """
    from . import config
    from sqlalchemy import inspect

    @decorator
    def go(fn, *args, **kw):

        try:
            return fn(*args, **kw)
        finally:
            drop_all_tables(
                config.db, inspect(config.db), include_names=names)
    return go
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def __init__(self, entity, equivalents=None, adapt_required=False,
                 chain_to=None, allow_label_resolve=True,
                 anonymize_labels=False):
        info = inspection.inspect(entity)

        self.mapper = info.mapper
        selectable = info.selectable
        is_aliased_class = info.is_aliased_class
        if is_aliased_class:
            self.aliased_class = entity
        else:
            self.aliased_class = None

        sql_util.ColumnAdapter.__init__(
            self, selectable, equivalents, chain_to,
            adapt_required=adapt_required,
            allow_label_resolve=allow_label_resolve,
            anonymize_labels=anonymize_labels,
            include_fn=self._include_fn
        )
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def _test_get_table_names(self, schema=None, table_type='table',
                              order_by=None):
        meta = self.metadata
        users, addresses, dingalings = self.tables.users, \
            self.tables.email_addresses, self.tables.dingalings
        insp = inspect(meta.bind)

        if table_type == 'view':
            table_names = insp.get_view_names(schema)
            table_names.sort()
            answer = ['email_addresses_v', 'users_v']
            eq_(sorted(table_names), answer)
        else:
            table_names = insp.get_table_names(schema,
                                               order_by=order_by)
            if order_by == 'foreign_key':
                answer = ['users', 'email_addresses', 'dingalings']
                eq_(table_names, answer)
            else:
                answer = ['dingalings', 'email_addresses', 'users']
                eq_(sorted(table_names), answer)
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def force_drop_names(*names):
    """Force the given table names to be dropped after test complete,
    isolating for foreign key cycles

    """
    from . import config
    from sqlalchemy import inspect

    @decorator
    def go(fn, *args, **kw):

        try:
            return fn(*args, **kw)
        finally:
            drop_all_tables(
                config.db, inspect(config.db), include_names=names)
    return go
项目:sql-data-dependency    作者:rkeilty    | 项目源码 | 文件源码
def __init__(self, server=None, database=None, username=None, password=None, port=None):
        self._server = server
        self._database = database
        self._username = username
        self._password = password
        self._port = port

        # Make these global for now
        self.engine = create_engine('mysql://{username}:{password}@{server}:{port}/{database}'.format(
            username=self._username,
            password=self._password,
            server=self._server,
            port=self._port,
            database=self._database
        ))
        self.inspector = inspect(self.engine)
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def _test_get_table_names(self, schema=None, table_type='table',
                              order_by=None):
        meta = self.metadata
        users, addresses, dingalings = self.tables.users, \
            self.tables.email_addresses, self.tables.dingalings
        insp = inspect(meta.bind)
        if table_type == 'view':
            table_names = insp.get_view_names(schema)
            table_names.sort()
            answer = ['email_addresses_v', 'users_v']
            eq_(sorted(table_names), answer)
        else:
            table_names = insp.get_table_names(schema,
                                               order_by=order_by)
            if order_by == 'foreign_key':
                answer = ['users', 'email_addresses', 'dingalings']
                eq_(table_names, answer)
            else:
                answer = ['dingalings', 'email_addresses', 'users']
                eq_(sorted(table_names), answer)
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def _test_get_indexes(self, schema=None):
        meta = self.metadata
        users, addresses, dingalings = self.tables.users, \
            self.tables.email_addresses, self.tables.dingalings
        # The database may decide to create indexes for foreign keys, etc.
        # so there may be more indexes than expected.
        insp = inspect(meta.bind)
        indexes = insp.get_indexes('users', schema=schema)
        expected_indexes = [
            {'unique': False,
             'column_names': ['test1', 'test2'],
             'name': 'users_t_idx'},
            {'unique': False,
             'column_names': ['user_id', 'test2', 'test1'],
             'name': 'users_all_idx'}
        ]
        index_names = [d['name'] for d in indexes]
        for e_index in expected_indexes:
            assert e_index['name'] in index_names
            index = indexes[index_names.index(e_index['name'])]
            for key in e_index:
                eq_(e_index[key], index[key])
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def test_autoincrement_col(self):
        """test that 'autoincrement' is reflected according to sqla's policy.

        Don't mark this test as unsupported for any backend !

        (technically it fails with MySQL InnoDB since "id" comes before "id2")

        A backend is better off not returning "autoincrement" at all,
        instead of potentially returning "False" for an auto-incrementing
        primary key column.

        """

        meta = self.metadata
        insp = inspect(meta.bind)

        for tname, cname in [
            ('users', 'user_id'),
            ('email_addresses', 'address_id'),
            ('dingalings', 'dingaling_id'),
        ]:
            cols = insp.get_columns(tname)
            id_ = dict((c['name'], c) for c in cols)[cname]
            assert id_.get('autoincrement', True)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _test_get_table_names(self, schema=None, table_type='table',
                              order_by=None):
        meta = self.metadata
        users, addresses, dingalings = self.tables.users, \
            self.tables.email_addresses, self.tables.dingalings
        insp = inspect(meta.bind)
        if table_type == 'view':
            table_names = insp.get_view_names(schema)
            table_names.sort()
            answer = ['email_addresses_v', 'users_v']
            eq_(sorted(table_names), answer)
        else:
            table_names = insp.get_table_names(schema,
                                               order_by=order_by)
            if order_by == 'foreign_key':
                answer = ['users', 'email_addresses', 'dingalings']
                eq_(table_names, answer)
            else:
                answer = ['dingalings', 'email_addresses', 'users']
                eq_(sorted(table_names), answer)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _test_get_indexes(self, schema=None):
        meta = self.metadata
        users, addresses, dingalings = self.tables.users, \
            self.tables.email_addresses, self.tables.dingalings
        # The database may decide to create indexes for foreign keys, etc.
        # so there may be more indexes than expected.
        insp = inspect(meta.bind)
        indexes = insp.get_indexes('users', schema=schema)
        expected_indexes = [
            {'unique': False,
             'column_names': ['test1', 'test2'],
             'name': 'users_t_idx'},
            {'unique': False,
             'column_names': ['user_id', 'test2', 'test1'],
             'name': 'users_all_idx'}
        ]
        index_names = [d['name'] for d in indexes]
        for e_index in expected_indexes:
            assert e_index['name'] in index_names
            index = indexes[index_names.index(e_index['name'])]
            for key in e_index:
                eq_(e_index[key], index[key])
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_autoincrement_col(self):
        """test that 'autoincrement' is reflected according to sqla's policy.

        Don't mark this test as unsupported for any backend !

        (technically it fails with MySQL InnoDB since "id" comes before "id2")

        A backend is better off not returning "autoincrement" at all,
        instead of potentially returning "False" for an auto-incrementing
        primary key column.

        """

        meta = self.metadata
        insp = inspect(meta.bind)

        for tname, cname in [
            ('users', 'user_id'),
            ('email_addresses', 'address_id'),
            ('dingalings', 'dingaling_id'),
        ]:
            cols = insp.get_columns(tname)
            id_ = dict((c['name'], c) for c in cols)[cname]
            assert id_.get('autoincrement', True)
项目:saas-api-boilerplate    作者:rgant    | 项目源码 | 文件源码
def __repr__(self):
        """
        Custom representation for this Model. Data that isn't currently loaded won't be fetched
        from DB, shows <not loaded> instead.
        Based on:
        https://github.com/kvesteri/sqlalchemy-utils/blob/0.32.14/sqlalchemy_utils/models.py#L41
        Customized to use our __str__ for fully qualified class name.
        :return str: python representation for creating this model.
        """
        state = sa.inspect(self)
        field_reprs = []
        fields = state.mapper.columns.keys()
        for key in fields:
            value = state.attrs[key].loaded_value
            if value == NO_VALUE:
                value = '<not loaded>'
            else:
                value = repr(value)
            field_reprs.append('='.join((key, value)))

        return "{0}({1})".format(self, ', '.join(field_reprs))
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def __init__(self, entity, equivalents=None, adapt_required=False,
                 chain_to=None, allow_label_resolve=True,
                 anonymize_labels=False):
        info = inspection.inspect(entity)

        self.mapper = info.mapper
        selectable = info.selectable
        is_aliased_class = info.is_aliased_class
        if is_aliased_class:
            self.aliased_class = entity
        else:
            self.aliased_class = None

        sql_util.ColumnAdapter.__init__(
            self, selectable, equivalents, chain_to,
            adapt_required=adapt_required,
            allow_label_resolve=allow_label_resolve,
            anonymize_labels=anonymize_labels,
            include_fn=self._include_fn
        )
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def _test_get_table_names(self, schema=None, table_type='table',
                              order_by=None):
        meta = self.metadata
        users, addresses, dingalings = self.tables.users, \
            self.tables.email_addresses, self.tables.dingalings
        insp = inspect(meta.bind)

        if table_type == 'view':
            table_names = insp.get_view_names(schema)
            table_names.sort()
            answer = ['email_addresses_v', 'users_v']
            eq_(sorted(table_names), answer)
        else:
            table_names = insp.get_table_names(schema,
                                               order_by=order_by)
            if order_by == 'foreign_key':
                answer = ['users', 'email_addresses', 'dingalings']
                eq_(table_names, answer)
            else:
                answer = ['dingalings', 'email_addresses', 'users']
                eq_(sorted(table_names), answer)
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def force_drop_names(*names):
    """Force the given table names to be dropped after test complete,
    isolating for foreign key cycles

    """
    from . import config
    from sqlalchemy import inspect

    @decorator
    def go(fn, *args, **kw):

        try:
            return fn(*args, **kw)
        finally:
            drop_all_tables(
                config.db, inspect(config.db), include_names=names)
    return go
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def __init__(self, entity, equivalents=None, adapt_required=False,
                 chain_to=None, allow_label_resolve=True,
                 anonymize_labels=False):
        info = inspection.inspect(entity)

        self.mapper = info.mapper
        selectable = info.selectable
        is_aliased_class = info.is_aliased_class
        if is_aliased_class:
            self.aliased_class = entity
        else:
            self.aliased_class = None

        sql_util.ColumnAdapter.__init__(
            self, selectable, equivalents, chain_to,
            adapt_required=adapt_required,
            allow_label_resolve=allow_label_resolve,
            anonymize_labels=anonymize_labels,
            include_fn=self._include_fn
        )
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def _test_get_table_names(self, schema=None, table_type='table',
                              order_by=None):
        meta = self.metadata
        users, addresses, dingalings = self.tables.users, \
            self.tables.email_addresses, self.tables.dingalings
        insp = inspect(meta.bind)

        if table_type == 'view':
            table_names = insp.get_view_names(schema)
            table_names.sort()
            answer = ['email_addresses_v', 'users_v']
            eq_(sorted(table_names), answer)
        else:
            table_names = insp.get_table_names(schema,
                                               order_by=order_by)
            if order_by == 'foreign_key':
                answer = ['users', 'email_addresses', 'dingalings']
                eq_(table_names, answer)
            else:
                answer = ['dingalings', 'email_addresses', 'users']
                eq_(sorted(table_names), answer)
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def _test_get_indexes(self, schema=None):
        meta = self.metadata
        users, addresses, dingalings = self.tables.users, \
            self.tables.email_addresses, self.tables.dingalings
        # The database may decide to create indexes for foreign keys, etc.
        # so there may be more indexes than expected.
        insp = inspect(meta.bind)
        indexes = insp.get_indexes('users', schema=schema)
        expected_indexes = [
            {'unique': False,
             'column_names': ['test1', 'test2'],
             'name': 'users_t_idx'},
            {'unique': False,
             'column_names': ['user_id', 'test2', 'test1'],
             'name': 'users_all_idx'}
        ]
        index_names = [d['name'] for d in indexes]
        for e_index in expected_indexes:
            assert e_index['name'] in index_names
            index = indexes[index_names.index(e_index['name'])]
            for key in e_index:
                eq_(e_index[key], index[key])
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def test_autoincrement_col(self):
        """test that 'autoincrement' is reflected according to sqla's policy.

        Don't mark this test as unsupported for any backend !

        (technically it fails with MySQL InnoDB since "id" comes before "id2")

        A backend is better off not returning "autoincrement" at all,
        instead of potentially returning "False" for an auto-incrementing
        primary key column.

        """

        meta = self.metadata
        insp = inspect(meta.bind)

        for tname, cname in [
            ('users', 'user_id'),
            ('email_addresses', 'address_id'),
            ('dingalings', 'dingaling_id'),
        ]:
            cols = insp.get_columns(tname)
            id_ = dict((c['name'], c) for c in cols)[cname]
            assert id_.get('autoincrement', True)
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def force_drop_names(*names):
    """Force the given table names to be dropped after test complete,
    isolating for foreign key cycles

    """
    from . import config
    from sqlalchemy import inspect

    @decorator
    def go(fn, *args, **kw):

        try:
            return fn(*args, **kw)
        finally:
            drop_all_tables(
                config.db, inspect(config.db), include_names=names)
    return go
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def unique_constraint_reflection(self):
        def doesnt_have_check_uq_constraints(config):
            if not util.sqla_084:
                return True
            from sqlalchemy import inspect
            insp = inspect(config.db)
            try:
                insp.get_unique_constraints('x')
            except NotImplementedError:
                return True
            except TypeError:
                return True
            except Exception:
                pass
            return False

        return exclusions.skip_if(
            lambda config: not util.sqla_084,
            "SQLAlchemy 0.8.4 or greater required"
        ) + exclusions.skip_if(doesnt_have_check_uq_constraints)
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def object_mapper(instance):
    """Given an object, return the primary Mapper associated with the object
    instance.

    Raises :class:`sqlalchemy.orm.exc.UnmappedInstanceError`
    if no mapping is configured.

    This function is available via the inspection system as::

        inspect(instance).mapper

    Using the inspection system will raise
    :class:`sqlalchemy.exc.NoInspectionAvailable` if the instance is
    not part of a mapping.

    """
    return object_state(instance).mapper
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def class_mapper(class_, configure=True):
    """Given a class, return the primary :class:`.Mapper` associated
    with the key.

    Raises :class:`.UnmappedClassError` if no mapping is configured
    on the given class, or :class:`.ArgumentError` if a non-class
    object is passed.

    Equivalent functionality is available via the :func:`.inspect`
    function as::

        inspect(some_mapped_class)

    Using the inspection system will raise
    :class:`sqlalchemy.exc.NoInspectionAvailable` if the class is not mapped.

    """
    mapper = _inspect_mapped_class(class_, configure=configure)
    if mapper is None:
        if not isinstance(class_, type):
            raise sa_exc.ArgumentError(
                    "Class object expected, got '%r'." % class_)
        raise exc.UnmappedClassError(class_)
    else:
        return mapper
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def _test_get_indexes(self, schema=None):
        meta = self.metadata
        users, addresses, dingalings = self.tables.users, \
                    self.tables.email_addresses, self.tables.dingalings
        # The database may decide to create indexes for foreign keys, etc.
        # so there may be more indexes than expected.
        insp = inspect(meta.bind)
        indexes = insp.get_indexes('users', schema=schema)
        expected_indexes = [
            {'unique': False,
             'column_names': ['test1', 'test2'],
             'name': 'users_t_idx'},
            {'unique': False,
             'column_names': ['user_id', 'test2', 'test1'],
             'name': 'users_all_idx'}
        ]
        index_names = [d['name'] for d in indexes]
        for e_index in expected_indexes:
            assert e_index['name'] in index_names
            index = indexes[index_names.index(e_index['name'])]
            for key in e_index:
                eq_(e_index[key], index[key])
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def test_autoincrement_col(self):
        """test that 'autoincrement' is reflected according to sqla's policy.

        Don't mark this test as unsupported for any backend !

        (technically it fails with MySQL InnoDB since "id" comes before "id2")

        A backend is better off not returning "autoincrement" at all,
        instead of potentially returning "False" for an auto-incrementing
        primary key column.

        """

        meta = self.metadata
        insp = inspect(meta.bind)

        for tname, cname in [
                ('users', 'user_id'),
                ('email_addresses', 'address_id'),
                ('dingalings', 'dingaling_id'),
            ]:
            cols = insp.get_columns(tname)
            id_ = dict((c['name'], c) for c in cols)[cname]
            assert id_.get('autoincrement', True)
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def __init__(self, entity, equivalents=None, adapt_required=False,
                 chain_to=None, allow_label_resolve=True,
                 anonymize_labels=False):
        info = inspection.inspect(entity)

        self.mapper = info.mapper
        selectable = info.selectable
        is_aliased_class = info.is_aliased_class
        if is_aliased_class:
            self.aliased_class = entity
        else:
            self.aliased_class = None

        sql_util.ColumnAdapter.__init__(
            self, selectable, equivalents, chain_to,
            adapt_required=adapt_required,
            allow_label_resolve=allow_label_resolve,
            anonymize_labels=anonymize_labels,
            include_fn=self._include_fn
        )
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def _test_get_table_names(self, schema=None, table_type='table',
                              order_by=None):
        meta = self.metadata
        users, addresses, dingalings = self.tables.users, \
            self.tables.email_addresses, self.tables.dingalings
        insp = inspect(meta.bind)

        if table_type == 'view':
            table_names = insp.get_view_names(schema)
            table_names.sort()
            answer = ['email_addresses_v', 'users_v']
            eq_(sorted(table_names), answer)
        else:
            table_names = insp.get_table_names(schema,
                                               order_by=order_by)
            if order_by == 'foreign_key':
                answer = ['users', 'email_addresses', 'dingalings']
                eq_(table_names, answer)
            else:
                answer = ['dingalings', 'email_addresses', 'users']
                eq_(sorted(table_names), answer)
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def _test_get_indexes(self, schema=None):
        meta = self.metadata
        users, addresses, dingalings = self.tables.users, \
            self.tables.email_addresses, self.tables.dingalings
        # The database may decide to create indexes for foreign keys, etc.
        # so there may be more indexes than expected.
        insp = inspect(meta.bind)
        indexes = insp.get_indexes('users', schema=schema)
        expected_indexes = [
            {'unique': False,
             'column_names': ['test1', 'test2'],
             'name': 'users_t_idx'},
            {'unique': False,
             'column_names': ['user_id', 'test2', 'test1'],
             'name': 'users_all_idx'}
        ]
        index_names = [d['name'] for d in indexes]
        for e_index in expected_indexes:
            assert e_index['name'] in index_names
            index = indexes[index_names.index(e_index['name'])]
            for key in e_index:
                eq_(e_index[key], index[key])
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def test_autoincrement_col(self):
        """test that 'autoincrement' is reflected according to sqla's policy.

        Don't mark this test as unsupported for any backend !

        (technically it fails with MySQL InnoDB since "id" comes before "id2")

        A backend is better off not returning "autoincrement" at all,
        instead of potentially returning "False" for an auto-incrementing
        primary key column.

        """

        meta = self.metadata
        insp = inspect(meta.bind)

        for tname, cname in [
            ('users', 'user_id'),
            ('email_addresses', 'address_id'),
            ('dingalings', 'dingaling_id'),
        ]:
            cols = insp.get_columns(tname)
            id_ = dict((c['name'], c) for c in cols)[cname]
            assert id_.get('autoincrement', True)
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def force_drop_names(*names):
    """Force the given table names to be dropped after test complete,
    isolating for foreign key cycles

    """
    from . import config
    from sqlalchemy import inspect

    @decorator
    def go(fn, *args, **kw):

        try:
            return fn(*args, **kw)
        finally:
            drop_all_tables(
                config.db, inspect(config.db), include_names=names)
    return go
项目:specstore    作者:datahq    | 项目源码 | 文件源码
def object_as_dict(obj):
        return {c.key: getattr(obj, c.key)
                for c in inspect(obj).mapper.column_attrs}
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def parent(self):
        """Return an inspection instance representing the parent.

        This will be either an instance of :class:`.Mapper`
        or :class:`.AliasedInsp`, depending upon the nature
        of the parent entity which this attribute is associated
        with.

        """
        return inspection.inspect(self._parententity)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def stop_test_class(cls):
    #from sqlalchemy import inspect
    #assert not inspect(testing.db).get_table_names()
    engines.testing_reaper._stop_test_ctx()
    if not options.low_connections:
        assertions.global_cleanup_assertions()
    _restore_engine()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_get_schema_names(self):
        insp = inspect(testing.db)

        self.assert_(testing.config.test_schema in insp.get_schema_names())
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_get_default_schema_name(self):
        insp = inspect(testing.db)
        eq_(insp.default_schema_name, testing.db.dialect.default_schema_name)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_get_temp_table_names(self):
        insp = inspect(self.bind)
        temp_table_names = insp.get_temp_table_names()
        eq_(sorted(temp_table_names), ['user_tmp'])
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_get_temp_view_names(self):
        insp = inspect(self.bind)
        temp_table_names = insp.get_temp_view_names()
        eq_(sorted(temp_table_names), ['user_tmp_v'])
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_nullable_reflection(self):
        t = Table('t', self.metadata,
                  Column('a', Integer, nullable=True),
                  Column('b', Integer, nullable=False))
        t.create()
        eq_(
            dict(
                (col['name'], col['nullable'])
                for col in inspect(self.metadata.bind).get_columns('t')
            ),
            {"a": True, "b": False}
        )
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_get_temp_table_columns(self):
        meta = MetaData(self.bind)
        user_tmp = self.tables.user_tmp
        insp = inspect(meta.bind)
        cols = insp.get_columns('user_tmp')
        self.assert_(len(cols) > 0, len(cols))

        for i, col in enumerate(user_tmp.columns):
            eq_(col.name, cols[i]['name'])
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_get_temp_view_columns(self):
        insp = inspect(self.bind)
        cols = insp.get_columns('user_tmp_v')
        eq_(
            [col['name'] for col in cols],
            ['id', 'name', 'foo']
        )
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _test_get_pk_constraint(self, schema=None):
        meta = self.metadata
        users, addresses = self.tables.users, self.tables.email_addresses
        insp = inspect(meta.bind)

        users_cons = insp.get_pk_constraint(users.name, schema=schema)
        users_pkeys = users_cons['constrained_columns']
        eq_(users_pkeys, ['user_id'])

        addr_cons = insp.get_pk_constraint(addresses.name, schema=schema)
        addr_pkeys = addr_cons['constrained_columns']
        eq_(addr_pkeys, ['address_id'])

        with testing.requires.reflects_pk_names.fail_if():
            eq_(addr_cons['name'], 'email_ad_pk')
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _test_get_foreign_keys(self, schema=None):
        meta = self.metadata
        users, addresses, dingalings = self.tables.users, \
            self.tables.email_addresses, self.tables.dingalings
        insp = inspect(meta.bind)
        expected_schema = schema
        # users

        if testing.requires.self_referential_foreign_keys.enabled:
            users_fkeys = insp.get_foreign_keys(users.name,
                                                schema=schema)
            fkey1 = users_fkeys[0]

            with testing.requires.named_constraints.fail_if():
                self.assert_(fkey1['name'] is not None)

            eq_(fkey1['referred_schema'], expected_schema)
            eq_(fkey1['referred_table'], users.name)
            eq_(fkey1['referred_columns'], ['user_id', ])
            if testing.requires.self_referential_foreign_keys.enabled:
                eq_(fkey1['constrained_columns'], ['parent_user_id'])

        # addresses
        addr_fkeys = insp.get_foreign_keys(addresses.name,
                                           schema=schema)
        fkey1 = addr_fkeys[0]

        with testing.requires.named_constraints.fail_if():
            self.assert_(fkey1['name'] is not None)

        eq_(fkey1['referred_schema'], expected_schema)
        eq_(fkey1['referred_table'], users.name)
        eq_(fkey1['referred_columns'], ['user_id', ])
        eq_(fkey1['constrained_columns'], ['remote_user_id'])
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_get_temp_table_unique_constraints(self):
        insp = inspect(self.bind)
        reflected = insp.get_unique_constraints('user_tmp')
        for refl in reflected:
            # Different dialects handle duplicate index and constraints
            # differently, so ignore this flag
            refl.pop('duplicates_index', None)
        eq_(reflected, [{'column_names': ['name'], 'name': 'user_tmp_uq'}])
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_get_temp_table_indexes(self):
        insp = inspect(self.bind)
        indexes = insp.get_indexes('user_tmp')
        for ind in indexes:
            ind.pop('dialect_options', None)
        eq_(
            # TODO: we need to add better filtering for indexes/uq constraints
            # that are doubled up
            [idx for idx in indexes if idx['name'] == 'user_tmp_ix'],
            [{'unique': False, 'column_names': ['foo'], 'name': 'user_tmp_ix'}]
        )
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _test_get_view_definition(self, schema=None):
        meta = self.metadata
        users, addresses, dingalings = self.tables.users, \
            self.tables.email_addresses, self.tables.dingalings
        view_name1 = 'users_v'
        view_name2 = 'email_addresses_v'
        insp = inspect(meta.bind)
        v1 = insp.get_view_definition(view_name1, schema=schema)
        self.assert_(v1)
        v2 = insp.get_view_definition(view_name2, schema=schema)
        self.assert_(v2)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _test_get_table_oid(self, table_name, schema=None):
        meta = self.metadata
        users, addresses, dingalings = self.tables.users, \
            self.tables.email_addresses, self.tables.dingalings
        insp = inspect(meta.bind)
        oid = insp.get_table_oid(table_name, schema)
        self.assert_(isinstance(oid, int))
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def stop_test_class(cls):
    #from sqlalchemy import inspect
    #assert not inspect(testing.db).get_table_names()
    _restore_engine()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def record_ops(session, flush_context=None, instances=None):
        try:
            d = session._model_changes
        except AttributeError:
            return

        for targets, operation in ((session.new, 'insert'), (session.dirty, 'update'), (session.deleted, 'delete')):
            for target in targets:
                state = inspect(target)
                key = state.identity_key if state.has_identity else id(target)
                d[key] = (target, operation)