我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.MetaData()。
def __init__(self, engine): self.engine = engine metadata = sa.MetaData(bind=engine) metadata.reflect(only=asset_db_table_names) for table_name in asset_db_table_names: setattr(self, table_name, metadata.tables[table_name]) # Check the version info of the db for compatibility check_version_info(self.version_info, ASSET_DB_VERSION) # Cache for lookup of assets by sid, the objects in the asset lookup # may be shared with the results from equity and future lookup caches. # # The top level cache exists to minimize lookups on the asset type # routing. # # The caches are read through, i.e. accessing an asset through # retrieve_asset will populate the cache on first retrieval. self._caches = (self._asset_cache, self._asset_type_cache) = {}, {} # Populated on first call to `lifetimes`. self._asset_lifetimes = None
def test_write_version(self): env = TradingEnvironment(load=noop_load) metadata = sa.MetaData(bind=env.engine) version_table = _version_table_schema(metadata) version_table.delete().execute() # Assert that the version is not present in the table self.assertIsNone(sa.select((version_table.c.version,)).scalar()) # This should fail because the table has no version info and is, # therefore, consdered v0 with self.assertRaises(AssetDBVersionError): check_version_info(version_table, -2) # This should not raise an error because the version has been written write_version_info(version_table, -2) check_version_info(version_table, -2) # Assert that the version is in the table and correct self.assertEqual(sa.select((version_table.c.version,)).scalar(), -2) # Assert that trying to overwrite the version fails with self.assertRaises(sa.exc.IntegrityError): write_version_info(version_table, -3)
def test_finder_checks_version(self): # Create an env and give it a bogus version number env = TradingEnvironment(load=noop_load) metadata = sa.MetaData(bind=env.engine) version_table = _version_table_schema(metadata) version_table.delete().execute() write_version_info(version_table, -2) check_version_info(version_table, -2) # Assert that trying to build a finder with a bad db raises an error with self.assertRaises(AssetDBVersionError): AssetFinder(engine=env.engine) # Change the version number of the db to the correct version version_table.delete().execute() write_version_info(version_table, ASSET_DB_VERSION) check_version_info(version_table, ASSET_DB_VERSION) # Now that the versions match, this Finder should succeed AssetFinder(engine=env.engine)
def bind(self): """Return the current "bind". In online mode, this is an instance of :class:`sqlalchemy.engine.Connection`, and is suitable for ad-hoc execution of any kind of usage described in :ref:`sqlexpression_toplevel` as well as for usage with the :meth:`sqlalchemy.schema.Table.create` and :meth:`sqlalchemy.schema.MetaData.create_all` methods of :class:`~sqlalchemy.schema.Table`, :class:`~sqlalchemy.schema.MetaData`. Note that when "standard output" mode is enabled, this bind will be a "mock" connection handler that cannot return results and is only appropriate for a very limited subset of commands. """ return self.connection
def _get_autobase(table_prefix, bind): metadata = sa.MetaData(bind=bind) table_name = table_prefix + 'environment_hierarchy_level_value' metadata.reflect(only=[table_name]) AutoBase = automap.automap_base(metadata=metadata) def classname_for_table(base, refl_table_name, table): assert refl_table_name.startswith(table_prefix) noprefix_name = refl_table_name[len(table_prefix):] uname = u"".join(s.capitalize() for s in noprefix_name.split('_')) if not isinstance(uname, str): return uname.encode('utf-8') else: return uname AutoBase.prepare(classname_for_table=classname_for_table) return AutoBase
def get_columns_from_etl_table(self): try: extra = {} meta = MetaData(**extra.get('metadata_params', {})) table = Table( self.sql_table_name, meta, schema=self.schema or None, autoload=True, autoload_with=self.local_engine) except Exception: raise Exception( "Table doesn't seem to exist in the specified database, " "couldn't fetch column information") return len(table.columns)
def _gen_sa_table(sectype, metadata=None): """Generate SQLAlchemy Table object by sectype. """ if metadata is None: metadata = MetaData() table = Table( sectype, metadata, Column('Symbol', String(20), primary_key=True), Column('DataType', String(20), primary_key=True), Column('BarSize', String(10), primary_key=True), Column('TickerTime', DateTime(), primary_key=True), Column('opening', Float(10, 2)), Column('high', Float(10, 2)), Column('low', Float(10, 2)), Column('closing', Float(10, 2)), Column('volume', mysqlINTEGER(unsigned=True)), Column('barcount', mysqlINTEGER(unsigned=True)), Column('average', Float(10, 2)) ) return table
def db_version(): repository = _find_migrate_repo() try: return versioning_api.db_version(get_engine(), repository) except versioning_exceptions.DatabaseNotControlledError: meta = sqlalchemy.MetaData() engine = get_engine() meta.reflect(bind=engine) tables = meta.tables if len(tables) == 0: db_version_control(version.INIT_VERSION) return versioning_api.db_version(get_engine(), repository) else: # Some pre-Essex DB's may not be version controlled. # Require them to upgrade using Essex first. raise exception.WeiboException( _("Upgrade DB using Essex release first."))
def _init_class(cls): if cls.run_define_tables == 'each': if cls.run_create_tables == 'once': cls.run_create_tables = 'each' assert cls.run_inserts in ('each', None) if cls.other is None: cls.other = adict() if cls.tables is None: cls.tables = adict() if cls.bind is None: setattr(cls, 'bind', cls.setup_bind()) if cls.metadata is None: setattr(cls, 'metadata', sa.MetaData()) if cls.metadata.bind is None: cls.metadata.bind = cls.bind
def __init__(self): super(AlchemyBase, self).__init__() def fk_fixed_width(constraint, table): str_tokens = [table.name] +\ [element.parent.name for element in constraint.elements] +\ [element.target_fullname for element in constraint.elements] guid = uuid.uuid5(uuid.NAMESPACE_OID, "_".join(str_tokens).encode('ascii')) return str(guid) convention = { "fk_fixed_width": fk_fixed_width, "ix": 'ix_%(column_0_label)s', "uq": "uq_%(table_name)s_%(column_0_name)s", "ck": "ck_%(table_name)s_%(column_0_name)s", "fk": "fk_%(fk_fixed_width)s", "pk": "pk_%(table_name)s" } metadata = MetaData(naming_convention=convention) self.Model = declarative_base(metadata=metadata, cls=Model, name='Model', metaclass=_BoundDeclarativeMeta) self.Model.query = _QueryProperty(self)
def test03(self): """Create two tables, drop one, check the other is still there """ MyModelA().create_table(self.client) MyModelB().create_table(self.client) # now there are two tables now metadata = sa.MetaData() metadata.reflect(self.client.get_engine()) self.assertEqual(len(metadata.tables), 2) MyModelA().drop_table(self.client) # one table left metadata = sa.MetaData() metadata.reflect(self.client.get_engine()) self.assertEqual(len(metadata.tables), 1) self.assertEqual(list(metadata.tables.keys())[0], 'mymodelb')
def test04(self): """Create a table and populate it with some objects """ MyModelB().create_table(self.client) # create some objects session = self.client.create_session() session.add(MyModelB(my_other_field=17)) session.add(MyModelB(my_other_field=18)) session.add(MyModelB(my_other_field=19)) session.commit() metadata = sa.MetaData() metadata.reflect(self.client.get_engine()) query = select([func.count()]).select_from(metadata.tables['mymodelb']) count = self.client.get_engine().execute(query).scalar() self.assertEqual(count, 3) session.close()
def upgrade(migrate_engine): meta = sqlalchemy.MetaData() meta.bind = migrate_engine job = sqlalchemy.Table( 'job', meta, sqlalchemy.Column('id', sqlalchemy.String(50), primary_key=True, nullable=False), sqlalchemy.Column('scheduler_id', sqlalchemy.String(36), nullable=False), sqlalchemy.Column('job_type', sqlalchemy.String(10), nullable=False), sqlalchemy.Column('parameters', types.Dict), sqlalchemy.Column('created_at', sqlalchemy.DateTime), sqlalchemy.Column('updated_at', sqlalchemy.DateTime), mysql_engine='InnoDB', mysql_charset='utf8' ) try: job.create() except Exception: LOG.error("Table |%s| not created!", repr(job)) raise
def test_fancy_coltypes(self): Table( 'simple_items', self.metadata, Column('enum', ENUM('A', 'B', name='blah')), Column('bool', BOOLEAN), Column('number', NUMERIC(10, asdecimal=False)), ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import Boolean, Column, Enum, MetaData, Numeric, Table metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('enum', Enum('A', 'B', name='blah')), Column('bool', Boolean), Column('number', Numeric(10, asdecimal=False)) ) """
def test_boolean_detection(self): Table( 'simple_items', self.metadata, Column('bool1', INTEGER), Column('bool2', SMALLINT), Column('bool3', TINYINT), CheckConstraint('simple_items.bool1 IN (0, 1)'), CheckConstraint('simple_items.bool2 IN (0, 1)'), CheckConstraint('simple_items.bool3 IN (0, 1)') ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import Boolean, Column, MetaData, Table metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('bool1', Boolean), Column('bool2', Boolean), Column('bool3', Boolean) ) """
def test_enum_detection(self): Table( 'simple_items', self.metadata, Column('enum', VARCHAR(255)), CheckConstraint(r"simple_items.enum IN ('A', '\'B', 'C')") ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import Column, Enum, MetaData, Table metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('enum', Enum('A', "\\\\'B", 'C')) ) """
def test_column_adaptation(self): Table( 'simple_items', self.metadata, Column('id', BIGINT), Column('length', DOUBLE_PRECISION) ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import BigInteger, Column, Float, MetaData, Table metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('id', BigInteger), Column('length', Float) ) """
def test_constraints_table(self): Table( 'simple_items', self.metadata, Column('id', INTEGER), Column('number', INTEGER), CheckConstraint('number > 0'), UniqueConstraint('id', 'number') ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import CheckConstraint, Column, Integer, MetaData, Table, UniqueConstraint metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('id', Integer), Column('number', Integer), CheckConstraint('number > 0'), UniqueConstraint('id', 'number') ) """
def test_noindexes_table(self): simple_items = Table( 'simple_items', self.metadata, Column('number', INTEGER), CheckConstraint('number > 2') ) simple_items.indexes.add(Index('idx_number', simple_items.c.number)) assert self.generate_code(noindexes=True) == """\ # coding: utf-8 from sqlalchemy import CheckConstraint, Column, Integer, MetaData, Table metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('number', Integer), CheckConstraint('number > 2') ) """
def test_no_classes(self): Table( 'simple_items', self.metadata, Column('id', INTEGER, primary_key=True) ) assert self.generate_code(noclasses=True) == """\ # coding: utf-8 from sqlalchemy import Column, Integer, MetaData, Table metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('id', Integer, primary_key=True) ) """
def test_schema_boolean(self): Table( 'simple_items', self.metadata, Column('bool1', INTEGER), CheckConstraint('testschema.simple_items.bool1 IN (0, 1)'), schema='testschema' ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import Boolean, Column, MetaData, Table metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('bool1', Boolean), schema='testschema' ) """
def test_foreign_key_options(self): Table( 'simple_items', self.metadata, Column('name', VARCHAR, ForeignKey('simple_items.name', ondelete='CASCADE', onupdate='CASCADE', deferrable=True, initially='DEFERRED')) ) assert self.generate_code() == """\ # coding: utf-8 from sqlalchemy import Column, ForeignKey, MetaData, String, Table metadata = MetaData() t_simple_items = Table( 'simple_items', metadata, Column('name', String, ForeignKey('simple_items.name', ondelete='CASCADE', onupdate='CASCADE', \ deferrable=True, initially='DEFERRED')) ) """
def _set_table(self, table): if isinstance(table, basestring): if self.alter_metadata: if not self.meta: raise ValueError("metadata must be specified for table" " reflection when using alter_metadata") meta = self.meta if self.engine: meta.bind = self.engine else: if not self.engine and not self.meta: raise ValueError("engine or metadata must be specified" " to reflect tables") if not self.engine: self.engine = self.meta.bind meta = sqlalchemy.MetaData(bind=self.engine) self._table = sqlalchemy.Table(table, meta, autoload=True) elif isinstance(table, sqlalchemy.Table): self._table = table if not self.alter_metadata: self._table.meta = sqlalchemy.MetaData(bind=self._table.bind)
def getDiffOfModelAgainstDatabase(metadata, engine, excludeTables=None): """ Return differences of model against database. :return: object which will evaluate to :keyword:`True` if there \ are differences else :keyword:`False`. """ db_metadata = sqlalchemy.MetaData(engine, reflect=True) # sqlite will include a dynamically generated 'sqlite_sequence' table if # there are autoincrement sequences in the database; this should not be # compared. if engine.dialect.name == 'sqlite': if 'sqlite_sequence' in db_metadata.tables: db_metadata.remove(db_metadata.tables['sqlite_sequence']) return SchemaDiff(metadata, db_metadata, labelA='model', labelB='database', excludeTables=excludeTables)
def setUp(self): super(Test4fc07b41d45c, self).setUp() self.previous_revision = "42a3c8c0db75" self.current_revision = "4fc07b41d45c" self.metadata = sa.MetaData(bind=self.engine) # NOTE(thomasem): Create a quark_ip_addresses table that has an # identical schema as the revision before it for the columns this data # migration is concerned with. self.ip_addresses_table = sa.Table( 'quark_ip_addresses', self.metadata, sa.Column('id', sa.String(length=36), primary_key=True), sa.Column('_deallocated', sa.Boolean()), sa.Column('address_type', sa.Enum('fixed', 'shared', 'floating')) ) self.metadata.create_all() alembic_command.stamp(self.config, self.previous_revision)
def reflect_hints_db(db_path): """ Reflect the database schema of the hints database, automapping the existing tables The NullPool is used to avoid concurrency issues with luigi. Using this activates pooling, but since sqlite doesn't really support pooling, what effectively happens is just that it locks the database and the other connections wait. :param db_path: path to hints sqlite database :return: sqlalchemy.MetaData object, sqlalchemy.orm.Session object """ engine = sqlalchemy.create_engine('sqlite:///{}'.format(db_path), poolclass=NullPool) metadata = sqlalchemy.MetaData() metadata.reflect(bind=engine) Base = automap_base(metadata=metadata) Base.prepare() speciesnames = Base.classes.speciesnames seqnames = Base.classes.seqnames hints = Base.classes.hints featuretypes = Base.classes.featuretypes Session = sessionmaker(bind=engine) session = Session() return speciesnames, seqnames, hints, featuretypes, session
def sa_table(): choices = ['a', 'b', 'c'] meta = sa.MetaData() post = sa.Table( 'test_post', meta, sa.Column('id', sa.Integer, nullable=False), sa.Column('title', sa.String(200), nullable=False), sa.Column('category', sa.String(200), nullable=True), sa.Column('body', sa.Text, nullable=False), sa.Column('views', sa.Integer, nullable=False), sa.Column('average_note', sa.Float, nullable=False), # sa.Column('pictures', postgresql.JSON, server_default='{}'), sa.Column('published_at', sa.DateTime, nullable=False), # sa.Column('tags', postgresql.ARRAY(sa.Integer), server_default='{}'), sa.Column('status', sa.Enum(*choices, name="enum_name", native_enum=False), server_default="a", nullable=False), sa.Column('visible', sa.Boolean, nullable=False), # Indexes # sa.PrimaryKeyConstraint('id', name='post_id_pkey')) return post
def table(): meta = sa.MetaData() post = sa.Table( 'post', meta, sa.Column('id', sa.Integer, nullable=False), sa.Column('title', sa.String(200), nullable=False), sa.Column('body', sa.Text, nullable=False), sa.Column('views', sa.Integer, nullable=False), sa.Column('average_note', sa.Float, nullable=False), sa.Column('pictures', postgresql.JSON, server_default='{}'), sa.Column('published_at', sa.Date, nullable=False), sa.Column('tags', postgresql.ARRAY(sa.Integer), server_default='[]'), # Indexes # sa.PrimaryKeyConstraint('id', name='post_id_pkey')) return post
def test_dtype(self): cols = ['A', 'B'] data = [(0.8, True), (0.9, None)] df = DataFrame(data, columns=cols) df.to_sql('dtype_test', self.conn) df.to_sql('dtype_test2', self.conn, dtype={'B': sqlalchemy.TEXT}) meta = sqlalchemy.schema.MetaData(bind=self.conn) meta.reflect() sqltype = meta.tables['dtype_test2'].columns['B'].type self.assertTrue(isinstance(sqltype, sqlalchemy.TEXT)) self.assertRaises(ValueError, df.to_sql, 'error', self.conn, dtype={'B': str}) # GH9083 df.to_sql('dtype_test3', self.conn, dtype={'B': sqlalchemy.String(10)}) meta.reflect() sqltype = meta.tables['dtype_test3'].columns['B'].type self.assertTrue(isinstance(sqltype, sqlalchemy.String)) self.assertEqual(sqltype.length, 10)
def test_notnull_dtype(self): cols = {'Bool': Series([True, None]), 'Date': Series([datetime(2012, 5, 1), None]), 'Int': Series([1, None], dtype='object'), 'Float': Series([1.1, None]) } df = DataFrame(cols) tbl = 'notnull_dtype_test' df.to_sql(tbl, self.conn) returned_df = sql.read_sql_table(tbl, self.conn) # noqa meta = sqlalchemy.schema.MetaData(bind=self.conn) meta.reflect() if self.flavor == 'mysql': my_type = sqltypes.Integer else: my_type = sqltypes.Boolean col_dict = meta.tables[tbl].columns self.assertTrue(isinstance(col_dict['Bool'].type, my_type)) self.assertTrue(isinstance(col_dict['Date'].type, sqltypes.DateTime)) self.assertTrue(isinstance(col_dict['Int'].type, sqltypes.Integer)) self.assertTrue(isinstance(col_dict['Float'].type, sqltypes.Float))
def generate_asset_db_metadata(bind=None): # NOTE: When modifying this schema, update the ASSET_DB_VERSION value metadata = sa.MetaData(bind=bind) _version_table_schema(metadata) _equities_table_schema(metadata) _futures_exchanges_schema(metadata) _futures_root_symbols_schema(metadata) _futures_contracts_schema(metadata) _asset_router_schema(metadata) return metadata # A list of the names of all tables in the assets db # NOTE: When modifying this schema, update the ASSET_DB_VERSION value
def test_get_temp_table_columns(self): meta = MetaData(self.bind) user_tmp = self.tables.user_tmp insp = inspect(meta.bind) cols = insp.get_columns('user_tmp') self.assert_(len(cols) > 0, len(cols)) for i, col in enumerate(user_tmp.columns): eq_(col.name, cols[i]['name'])
def test_invocation(self): dbapi_session = ReplayableSession() creator = config.db.pool._creator recorder = lambda: dbapi_session.recorder(creator()) engine = create_engine( config.db.url, creator=recorder, use_native_hstore=False) self.metadata = MetaData(engine) self.engine = engine self.session = Session(engine) self.setup_engine() try: self._run_steps(ctx=self._dummy_ctx) finally: self.teardown_engine() engine.dispose() player = lambda: dbapi_session.player() engine = create_engine( config.db.url, creator=player, use_native_hstore=False) self.metadata = MetaData(engine) self.engine = engine self.session = Session(engine) self.setup_engine() try: self._run_steps(ctx=profiling.count_functions) finally: self.session.close() engine.dispose()
def get_metadata(bind): """Return the metadata for a bind.""" if bind == '': bind = None m = MetaData() for t in target_metadata.tables.values(): if t.info.get('bind_key') == bind: t.tometadata(m) return m
def __init__(self, **kwargs): """Read current structure from database""" super().__init__(**kwargs) # Generate mappings from existing tables metadata = MetaData(schema='raw') metadata.reflect(self.engine) Base = automap_base(metadata=metadata) Base.prepare() # Our fundamental objects are: self.Onion = Base.classes.hs_history self.Example = Base.classes.frontpage_examples self.Cell = Base.classes.frontpage_traces self.Crawl = Base.classes.crawls
def tmp_table(self): extra = {} meta = MetaData(**extra.get('metadata_params', {})) return Table( self.sql_table_name, meta, schema=self.schema or None, autoload=True, autoload_with=self.local_engine)
def init_db(db_info): db_conn = "mysql+pymysql://{0}:{1}@{2}/{3}".format( db_info['user'], db_info['password'], db_info['host'], db_info['db']) engine = create_engine(db_conn, echo=False) metadata = MetaData(engine, reflect=True) sec_type_list = ['Index', 'Stock', 'Option', 'Future', 'Commodity', 'FuturesOption', 'Forex', 'Bond', 'MutualFund', 'CFD', 'Warrant'] for sectype in sec_type_list: if sectype not in metadata.tables.keys(): table = _gen_sa_table(sectype, metadata=metadata) table.create(engine, checkfirst=True) engine.dispose()
def _clear_db(self): # delete all exsiting tables and create new tables engine = create_engine(self.db_conn, echo=False) metadata = MetaData(engine, reflect=True) sec_type_list = ['Index', 'Stock', 'Option', 'Future', 'Commodity', 'FuturesOption', 'Forex', 'Bond', 'MutualFund', 'CFD', 'Warrant'] for sectype in sec_type_list: if sectype in metadata.tables.keys(): metadata.tables[sectype].drop() engine.dispose()
def test_init_db(self): self._clear_db() init_db(self.db_info) engine = create_engine(self.db_conn, echo=False) metadata = MetaData(engine, reflect=True) sec_type_list = ['Index', 'Stock', 'Option', 'Future', 'Commodity', 'FuturesOption', 'Forex', 'Bond', 'MutualFund', 'CFD', 'Warrant'] for sectype in sec_type_list: self.assertIn(sectype, metadata.tables.keys()) engine.dispose()
def test_insert_hist_data(self): self._clear_db() init_db(self.db_info) # Insert two time-overlapped MarketDataBlocks async def run(loop, data): engine = await aiosa.create_engine( user=self.db_info['user'], db=self.db_info['db'], host=self.db_info['host'], password=self.db_info['password'], loop=loop, echo=False) await insert_hist_data(engine, 'Stock', data[0]) await insert_hist_data(engine, 'Stock', data[1]) engine.close() await engine.wait_closed() # Execute insertion blk0 = MarketDataBlock(testdata_insert_hist_data[0]) blk1 = MarketDataBlock(testdata_insert_hist_data[1]) data = [blk0, blk1] loop = asyncio.get_event_loop() loop.run_until_complete(run(loop, data)) # Verify insertion df_source = testdata_insert_hist_data[2] engine = create_engine(self.db_conn) conn = engine.connect() metadata = MetaData(engine, reflect=True) table = metadata.tables['Stock'] result = conn.execute(select([table])) # self.assertEqual(result.keys(), list(df_source.columns)) df = pd.DataFrame(result.fetchall()) df.columns = result.keys() _logger.debug(df.TickerTime[0]) df.TickerTime = pd.DatetimeIndex(df.TickerTime).tz_localize('UTC') df_source.TickerTime = df_source.TickerTime.apply(pd.Timestamp) _logger.debug(df.iloc[0]) assert_frame_equal(df, df_source)
def load_table(name, connection): return sa.Table(name, sa.MetaData(), autoload=True, autoload_with=connection)
def initialize(self): metadata = MetaData() logs = Table(self.table_name, metadata, Column('task', String, primary_key=True), Column('date_time', DateTime, primary_key=True), Column('model', String), Column('parameters', String), Column('score', Float), Column('scorer_name', String), Column('validation_method', String), Column('predictions', String), Column('random_state', Integer)) mapper(self.OptimizationResultLog, logs) metadata.create_all(bind=self.engine)
def table_object(table_name, db_engine): """Produce a table object for the given table name This does not load data about the table from the engine yet, so it is safe to call for a table that doesn't exist. Args: table_name (string) A table name (with schema) db_engine (sqlalchemy.engine) Returns: (sqlalchemy.Table) """ schema, table = split_table(table_name) meta = MetaData(schema=schema, bind=db_engine) return Table(table, meta)