我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.inspect()。
def __init__(self, entity, equivalents=None, adapt_required=False, chain_to=None, allow_label_resolve=True, anonymize_labels=False): info = inspection.inspect(entity) self.mapper = info.mapper selectable = info.selectable is_aliased_class = info.is_aliased_class if is_aliased_class: self.aliased_class = entity else: self.aliased_class = None sql_util.ColumnAdapter.__init__( self, selectable, equivalents, chain_to, adapt_required=adapt_required, allow_label_resolve=allow_label_resolve, anonymize_labels=anonymize_labels, include_fn=self._include_fn )
def _test_get_table_names(self, schema=None, table_type='table', order_by=None): meta = self.metadata users, addresses, dingalings = self.tables.users, \ self.tables.email_addresses, self.tables.dingalings insp = inspect(meta.bind) if table_type == 'view': table_names = insp.get_view_names(schema) table_names.sort() answer = ['email_addresses_v', 'users_v'] eq_(sorted(table_names), answer) else: table_names = insp.get_table_names(schema, order_by=order_by) if order_by == 'foreign_key': answer = ['users', 'email_addresses', 'dingalings'] eq_(table_names, answer) else: answer = ['dingalings', 'email_addresses', 'users'] eq_(sorted(table_names), answer)
def force_drop_names(*names): """Force the given table names to be dropped after test complete, isolating for foreign key cycles """ from . import config from sqlalchemy import inspect @decorator def go(fn, *args, **kw): try: return fn(*args, **kw) finally: drop_all_tables( config.db, inspect(config.db), include_names=names) return go
def __init__(self, server=None, database=None, username=None, password=None, port=None): self._server = server self._database = database self._username = username self._password = password self._port = port # Make these global for now self.engine = create_engine('mysql://{username}:{password}@{server}:{port}/{database}'.format( username=self._username, password=self._password, server=self._server, port=self._port, database=self._database )) self.inspector = inspect(self.engine)
def _test_get_indexes(self, schema=None): meta = self.metadata users, addresses, dingalings = self.tables.users, \ self.tables.email_addresses, self.tables.dingalings # The database may decide to create indexes for foreign keys, etc. # so there may be more indexes than expected. insp = inspect(meta.bind) indexes = insp.get_indexes('users', schema=schema) expected_indexes = [ {'unique': False, 'column_names': ['test1', 'test2'], 'name': 'users_t_idx'}, {'unique': False, 'column_names': ['user_id', 'test2', 'test1'], 'name': 'users_all_idx'} ] index_names = [d['name'] for d in indexes] for e_index in expected_indexes: assert e_index['name'] in index_names index = indexes[index_names.index(e_index['name'])] for key in e_index: eq_(e_index[key], index[key])
def test_autoincrement_col(self): """test that 'autoincrement' is reflected according to sqla's policy. Don't mark this test as unsupported for any backend ! (technically it fails with MySQL InnoDB since "id" comes before "id2") A backend is better off not returning "autoincrement" at all, instead of potentially returning "False" for an auto-incrementing primary key column. """ meta = self.metadata insp = inspect(meta.bind) for tname, cname in [ ('users', 'user_id'), ('email_addresses', 'address_id'), ('dingalings', 'dingaling_id'), ]: cols = insp.get_columns(tname) id_ = dict((c['name'], c) for c in cols)[cname] assert id_.get('autoincrement', True)
def __repr__(self): """ Custom representation for this Model. Data that isn't currently loaded won't be fetched from DB, shows <not loaded> instead. Based on: https://github.com/kvesteri/sqlalchemy-utils/blob/0.32.14/sqlalchemy_utils/models.py#L41 Customized to use our __str__ for fully qualified class name. :return str: python representation for creating this model. """ state = sa.inspect(self) field_reprs = [] fields = state.mapper.columns.keys() for key in fields: value = state.attrs[key].loaded_value if value == NO_VALUE: value = '<not loaded>' else: value = repr(value) field_reprs.append('='.join((key, value))) return "{0}({1})".format(self, ', '.join(field_reprs))
def unique_constraint_reflection(self): def doesnt_have_check_uq_constraints(config): if not util.sqla_084: return True from sqlalchemy import inspect insp = inspect(config.db) try: insp.get_unique_constraints('x') except NotImplementedError: return True except TypeError: return True except Exception: pass return False return exclusions.skip_if( lambda config: not util.sqla_084, "SQLAlchemy 0.8.4 or greater required" ) + exclusions.skip_if(doesnt_have_check_uq_constraints)
def object_mapper(instance): """Given an object, return the primary Mapper associated with the object instance. Raises :class:`sqlalchemy.orm.exc.UnmappedInstanceError` if no mapping is configured. This function is available via the inspection system as:: inspect(instance).mapper Using the inspection system will raise :class:`sqlalchemy.exc.NoInspectionAvailable` if the instance is not part of a mapping. """ return object_state(instance).mapper
def class_mapper(class_, configure=True): """Given a class, return the primary :class:`.Mapper` associated with the key. Raises :class:`.UnmappedClassError` if no mapping is configured on the given class, or :class:`.ArgumentError` if a non-class object is passed. Equivalent functionality is available via the :func:`.inspect` function as:: inspect(some_mapped_class) Using the inspection system will raise :class:`sqlalchemy.exc.NoInspectionAvailable` if the class is not mapped. """ mapper = _inspect_mapped_class(class_, configure=configure) if mapper is None: if not isinstance(class_, type): raise sa_exc.ArgumentError( "Class object expected, got '%r'." % class_) raise exc.UnmappedClassError(class_) else: return mapper
def object_as_dict(obj): return {c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs}
def parent(self): """Return an inspection instance representing the parent. This will be either an instance of :class:`.Mapper` or :class:`.AliasedInsp`, depending upon the nature of the parent entity which this attribute is associated with. """ return inspection.inspect(self._parententity)
def stop_test_class(cls): #from sqlalchemy import inspect #assert not inspect(testing.db).get_table_names() engines.testing_reaper._stop_test_ctx() if not options.low_connections: assertions.global_cleanup_assertions() _restore_engine()
def test_get_schema_names(self): insp = inspect(testing.db) self.assert_(testing.config.test_schema in insp.get_schema_names())
def test_get_default_schema_name(self): insp = inspect(testing.db) eq_(insp.default_schema_name, testing.db.dialect.default_schema_name)
def test_get_temp_table_names(self): insp = inspect(self.bind) temp_table_names = insp.get_temp_table_names() eq_(sorted(temp_table_names), ['user_tmp'])
def test_get_temp_view_names(self): insp = inspect(self.bind) temp_table_names = insp.get_temp_view_names() eq_(sorted(temp_table_names), ['user_tmp_v'])
def test_nullable_reflection(self): t = Table('t', self.metadata, Column('a', Integer, nullable=True), Column('b', Integer, nullable=False)) t.create() eq_( dict( (col['name'], col['nullable']) for col in inspect(self.metadata.bind).get_columns('t') ), {"a": True, "b": False} )
def test_get_temp_table_columns(self): meta = MetaData(self.bind) user_tmp = self.tables.user_tmp insp = inspect(meta.bind) cols = insp.get_columns('user_tmp') self.assert_(len(cols) > 0, len(cols)) for i, col in enumerate(user_tmp.columns): eq_(col.name, cols[i]['name'])
def test_get_temp_view_columns(self): insp = inspect(self.bind) cols = insp.get_columns('user_tmp_v') eq_( [col['name'] for col in cols], ['id', 'name', 'foo'] )
def _test_get_pk_constraint(self, schema=None): meta = self.metadata users, addresses = self.tables.users, self.tables.email_addresses insp = inspect(meta.bind) users_cons = insp.get_pk_constraint(users.name, schema=schema) users_pkeys = users_cons['constrained_columns'] eq_(users_pkeys, ['user_id']) addr_cons = insp.get_pk_constraint(addresses.name, schema=schema) addr_pkeys = addr_cons['constrained_columns'] eq_(addr_pkeys, ['address_id']) with testing.requires.reflects_pk_names.fail_if(): eq_(addr_cons['name'], 'email_ad_pk')
def _test_get_foreign_keys(self, schema=None): meta = self.metadata users, addresses, dingalings = self.tables.users, \ self.tables.email_addresses, self.tables.dingalings insp = inspect(meta.bind) expected_schema = schema # users if testing.requires.self_referential_foreign_keys.enabled: users_fkeys = insp.get_foreign_keys(users.name, schema=schema) fkey1 = users_fkeys[0] with testing.requires.named_constraints.fail_if(): self.assert_(fkey1['name'] is not None) eq_(fkey1['referred_schema'], expected_schema) eq_(fkey1['referred_table'], users.name) eq_(fkey1['referred_columns'], ['user_id', ]) if testing.requires.self_referential_foreign_keys.enabled: eq_(fkey1['constrained_columns'], ['parent_user_id']) # addresses addr_fkeys = insp.get_foreign_keys(addresses.name, schema=schema) fkey1 = addr_fkeys[0] with testing.requires.named_constraints.fail_if(): self.assert_(fkey1['name'] is not None) eq_(fkey1['referred_schema'], expected_schema) eq_(fkey1['referred_table'], users.name) eq_(fkey1['referred_columns'], ['user_id', ]) eq_(fkey1['constrained_columns'], ['remote_user_id'])
def test_get_temp_table_unique_constraints(self): insp = inspect(self.bind) reflected = insp.get_unique_constraints('user_tmp') for refl in reflected: # Different dialects handle duplicate index and constraints # differently, so ignore this flag refl.pop('duplicates_index', None) eq_(reflected, [{'column_names': ['name'], 'name': 'user_tmp_uq'}])
def test_get_temp_table_indexes(self): insp = inspect(self.bind) indexes = insp.get_indexes('user_tmp') for ind in indexes: ind.pop('dialect_options', None) eq_( # TODO: we need to add better filtering for indexes/uq constraints # that are doubled up [idx for idx in indexes if idx['name'] == 'user_tmp_ix'], [{'unique': False, 'column_names': ['foo'], 'name': 'user_tmp_ix'}] )
def _test_get_view_definition(self, schema=None): meta = self.metadata users, addresses, dingalings = self.tables.users, \ self.tables.email_addresses, self.tables.dingalings view_name1 = 'users_v' view_name2 = 'email_addresses_v' insp = inspect(meta.bind) v1 = insp.get_view_definition(view_name1, schema=schema) self.assert_(v1) v2 = insp.get_view_definition(view_name2, schema=schema) self.assert_(v2)
def _test_get_table_oid(self, table_name, schema=None): meta = self.metadata users, addresses, dingalings = self.tables.users, \ self.tables.email_addresses, self.tables.dingalings insp = inspect(meta.bind) oid = insp.get_table_oid(table_name, schema) self.assert_(isinstance(oid, int))
def stop_test_class(cls): #from sqlalchemy import inspect #assert not inspect(testing.db).get_table_names() _restore_engine()
def record_ops(session, flush_context=None, instances=None): try: d = session._model_changes except AttributeError: return for targets, operation in ((session.new, 'insert'), (session.dirty, 'update'), (session.deleted, 'delete')): for target in targets: state = inspect(target) key = state.identity_key if state.has_identity else id(target) d[key] = (target, operation)