我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.schema.MetaData()。
def provide_metadata(fn, *args, **kw): """Provide bound MetaData for a single test, dropping afterwards.""" from . import config from . import engines from sqlalchemy import schema metadata = schema.MetaData(config.db) self = args[0] prev_meta = getattr(self, 'metadata', None) self.metadata = metadata try: return fn(*args, **kw) finally: engines.drop_all_tables(metadata, config.db) self.metadata = prev_meta
def _create_table_setup(self): from sqlalchemy import Table, Column, PrimaryKeyConstraint column_names_and_types = \ self._get_column_names_and_types(self._sqlalchemy_type) columns = [Column(name, typ, index=is_index) for name, typ, is_index in column_names_and_types] if self.keys is not None: if not com.is_list_like(self.keys): keys = [self.keys] else: keys = self.keys pkc = PrimaryKeyConstraint(*keys, name=self.name + '_pk') columns.append(pkc) schema = self.schema or self.pd_sql.meta.schema # At this point, attach to new metadata, only attach to self.meta # once table is created. from sqlalchemy.schema import MetaData meta = MetaData(self.pd_sql, schema=schema) return Table(self.name, meta, *columns, schema=schema)
def drop_all_tables(engine, inspector, schema=None, include_names=None): from sqlalchemy import Column, Table, Integer, MetaData, \ ForeignKeyConstraint from sqlalchemy.schema import DropTable, DropConstraint if include_names is not None: include_names = set(include_names) with engine.connect() as conn: for tname, fkcs in reversed( inspector.get_sorted_table_and_fkc_names(schema=schema)): if tname: if include_names is not None and tname not in include_names: continue conn.execute(DropTable( Table(tname, MetaData(), schema=schema) )) elif fkcs: if not engine.dialect.supports_alter: continue for tname, fkc in fkcs: if include_names is not None and \ tname not in include_names: continue tb = Table( tname, MetaData(), Column('x', Integer), Column('y', Integer), schema=schema ) conn.execute(DropConstraint( ForeignKeyConstraint( [tb.c.x], [tb.c.y], name=fkc) ))
def provide_metadata(fn, *args, **kw): """Provide bound MetaData for a single test, dropping afterwards.""" from . import config from sqlalchemy import schema metadata = schema.MetaData(config.db) self = args[0] prev_meta = getattr(self, 'metadata', None) self.metadata = metadata try: return fn(*args, **kw) finally: metadata.drop_all() self.metadata = prev_meta
def get_metadata(self, force=False): if self.metadata and not force: return self.metadata dbconf=self.dbconf engine = self.get_engine(force=force) schema=dbconf.get('schema') self.metadata=metadata=MetaData(bind=engine, schema=schema) return metadata
def _primary_key_constraint(self, name, table_name, cols, schema=None): m = sa_schema.MetaData() columns = [sa_schema.Column(n, NULLTYPE) for n in cols] t1 = sa_schema.Table(table_name, m, *columns, schema=schema) p = sa_schema.PrimaryKeyConstraint(*columns, name=name) t1.append_constraint(p) return p
def _foreign_key_constraint(self, name, source, referent, local_cols, remote_cols, onupdate=None, ondelete=None, deferrable=None, source_schema=None, referent_schema=None): m = sa_schema.MetaData() if source == referent: t1_cols = local_cols + remote_cols else: t1_cols = local_cols sa_schema.Table(referent, m, *[sa_schema.Column(n, NULLTYPE) for n in remote_cols], schema=referent_schema) t1 = sa_schema.Table(source, m, *[sa_schema.Column(n, NULLTYPE) for n in t1_cols], schema=source_schema) tname = "%s.%s" % (referent_schema, referent) if referent_schema \ else referent f = sa_schema.ForeignKeyConstraint(local_cols, ["%s.%s" % (tname, n) for n in remote_cols], name=name, onupdate=onupdate, ondelete=ondelete, deferrable=deferrable ) t1.append_constraint(f) return f
def _unique_constraint(self, name, source, local_cols, schema=None, **kw): t = sa_schema.Table(source, sa_schema.MetaData(), *[sa_schema.Column(n, NULLTYPE) for n in local_cols], schema=schema) kw['name'] = name uq = sa_schema.UniqueConstraint(*[t.c[n] for n in local_cols], **kw) # TODO: need event tests to ensure the event # is fired off here t.append_constraint(uq) return uq
def _table(self, name, *columns, **kw): m = sa_schema.MetaData() t = sa_schema.Table(name, m, *columns, **kw) for f in t.foreign_keys: self._ensure_table_for_fk(m, f) return t
def _index(self, name, tablename, columns, schema=None, **kw): t = sa_schema.Table(tablename or 'no_table', sa_schema.MetaData(), *[sa_schema.Column(n, NULLTYPE) for n in columns], schema=schema ) return sa_schema.Index(name, *[t.c[n] for n in columns], **kw)
def metadata_from_session(s): """ Args: s: an SQLAlchemy :class:`Session` Returns: The metadata. Get the metadata associated with the schema. """ meta = MetaData() meta.reflect(bind=s.bind) return meta
def _prep_testing_database(options, file_config): from sqlalchemy.testing import engines from sqlalchemy import schema, inspect # also create alt schemas etc. here? if options.dropfirst: e = engines.utf8_engine() inspector = inspect(e) try: view_names = inspector.get_view_names() except NotImplementedError: pass else: for vname in view_names: e.execute(schema._DropView(schema.Table(vname, schema.MetaData()))) try: view_names = inspector.get_view_names(schema="test_schema") except NotImplementedError: pass else: for vname in view_names: e.execute(schema._DropView( schema.Table(vname, schema.MetaData(), schema="test_schema"))) for tname in reversed(inspector.get_table_names(order_by="foreign_key")): e.execute(schema.DropTable(schema.Table(tname, schema.MetaData()))) for tname in reversed(inspector.get_table_names( order_by="foreign_key", schema="test_schema")): e.execute(schema.DropTable( schema.Table(tname, schema.MetaData(), schema="test_schema"))) e.dispose()
def upgrade_database(config: Config, engine: Engine, metadata: MetaData, *, revision: str='head', module_name: typing.Optional[str]=None) -> None: """Upgrades the database schema to the chosen ``revision`` (default is head). """ script = ScriptDirectory.from_config(config) def upgrade(rev, context): def update_current_rev(old, new): if old == new: return if new is None: context.impl._exec(context._version.delete()) elif old is None: context.impl._exec( context._version.insert().values( version_num=literal_column("'%s'" % new) ) ) else: context.impl._exec( context._version.update().values( version_num=literal_column("'%s'" % new) ) ) if not rev and revision == 'head': if module_name: import_all_modules(module_name) metadata.create_all(engine) dest = script.get_revision(revision) update_current_rev(None, dest and dest.revision) return [] return script._upgrade_revs(revision, rev) with EnvironmentContext(config, script, fn=upgrade, as_sql=False, destination_rev=revision, tag=None): script.run_env()
def test_connection( ctx: object, metadata: MetaData, engine: Engine, real_transaction: bool=False ) -> typing.Generator: """Joining a SQLAlchemy session into an external transaction for test suit. .. seealso:: Documentation of the SQLAlchemy session used in test suites. <http://docs.sqlalchemy.org/en/latest/orm/session_transaction.html#joining-a-session-into-an-external-transaction-such-as-for-test-suites> """ # noqa if real_transaction: metadata.create_all(engine) try: yield engine finally: metadata.drop_all(engine, checkfirst=True) return connection = engine.connect() try: metadata.drop_all(connection, checkfirst=True) transaction = connection.begin() try: metadata.create_all(bind=connection) ctx._test_fx_connection = connection try: yield connection finally: del ctx._test_fx_connection finally: transaction.rollback() finally: connection.close() engine.dispose()
def main(): parser = argparse.ArgumentParser(description='Generates SQLAlchemy model code from an existing database.') parser.add_argument('url', nargs='?', help='SQLAlchemy url to the database') parser.add_argument('--version', action='store_true', help="print the version number and exit") parser.add_argument('--schema', help='load tables from an alternate schema') parser.add_argument('--tables', help='tables to process (comma-separated, default: all)') parser.add_argument('--noviews', action='store_true', help="ignore views") parser.add_argument('--noindexes', action='store_true', help='ignore indexes') parser.add_argument('--noconstraints', action='store_true', help='ignore constraints') parser.add_argument('--nojoined', action='store_true', help="don't autodetect joined table inheritance") parser.add_argument('--noinflect', action='store_true', help="don't try to convert tables names to singular form") parser.add_argument('--noclasses', action='store_true', help="don't generate classes, only tables") parser.add_argument('--outfile', help='file to write output to (default: stdout)') args = parser.parse_args() if args.version: version = pkg_resources.get_distribution('sqlacodegen').parsed_version print(version.public) return if not args.url: print('You must supply a url\n', file=sys.stderr) parser.print_help() return engine = create_engine(args.url) metadata = MetaData(engine) tables = args.tables.split(',') if args.tables else None metadata.reflect(engine, args.schema, not args.noviews, tables) outfile = codecs.open(args.outfile, 'w', encoding='utf-8') if args.outfile else sys.stdout generator = CodeGenerator(metadata, args.noindexes, args.noconstraints, args.nojoined, args.noinflect, args.noclasses) generator.render(outfile)