我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.schema.DropTable()。
def create_table(request, sa_table, database, loop, create_entries): async def f(rows): create_expr = CreateTable(sa_table) async with database.acquire() as conn: await conn.execute(create_expr) values = create_entries(rows) query1 = sa_table.insert().values(values) await conn.execute(query1) await conn.execute('commit;') return sa_table yield f async def fin(): drop_expr = DropTable(sa_table) async with database.acquire() as conn: await conn.execute(drop_expr) await conn.execute('commit;') loop.run_until_complete(fin())
def drop_all_tables(engine, inspector, schema=None, include_names=None): from sqlalchemy import Column, Table, Integer, MetaData, \ ForeignKeyConstraint from sqlalchemy.schema import DropTable, DropConstraint if include_names is not None: include_names = set(include_names) with engine.connect() as conn: for tname, fkcs in reversed( inspector.get_sorted_table_and_fkc_names(schema=schema)): if tname: if include_names is not None and tname not in include_names: continue conn.execute(DropTable( Table(tname, MetaData(), schema=schema) )) elif fkcs: if not engine.dialect.supports_alter: continue for tname, fkc in fkcs: if include_names is not None and \ tname not in include_names: continue tb = Table( tname, MetaData(), Column('x', Integer), Column('y', Integer), schema=schema ) conn.execute(DropConstraint( ForeignKeyConstraint( [tb.c.x], [tb.c.y], name=fkc) ))
def drop_table(self, table): self._exec(schema.DropTable(table))
def delete_table(self): logger.info('try to delete table {} in {}'.format( self.sql_table_name, self.schema )) table = self.get_sql_table_object(need_columns=False) return self.local_engine.execute(DropTable(table))
def _prep_testing_database(options, file_config): from sqlalchemy.testing import engines from sqlalchemy import schema, inspect # also create alt schemas etc. here? if options.dropfirst: e = engines.utf8_engine() inspector = inspect(e) try: view_names = inspector.get_view_names() except NotImplementedError: pass else: for vname in view_names: e.execute(schema._DropView(schema.Table(vname, schema.MetaData()))) try: view_names = inspector.get_view_names(schema="test_schema") except NotImplementedError: pass else: for vname in view_names: e.execute(schema._DropView( schema.Table(vname, schema.MetaData(), schema="test_schema"))) for tname in reversed(inspector.get_table_names(order_by="foreign_key")): e.execute(schema.DropTable(schema.Table(tname, schema.MetaData()))) for tname in reversed(inspector.get_table_names( order_by="foreign_key", schema="test_schema")): e.execute(schema.DropTable( schema.Table(tname, schema.MetaData(), schema="test_schema"))) e.dispose()
def drop_all_tables(engine, inspector, schema=None, include_names=None): from sqlalchemy import Column, Table, Integer, MetaData, \ ForeignKeyConstraint from sqlalchemy.schema import DropTable, DropConstraint if include_names is not None: include_names = set(include_names) with engine.connect() as conn: for tname, fkcs in reversed( inspector.get_sorted_table_and_fkc_names(schema=schema)): if tname: if include_names is not None and tname not in include_names: continue conn.execute(DropTable( Table(tname, MetaData()) )) elif fkcs: if not engine.dialect.supports_alter: continue for tname, fkc in fkcs: if include_names is not None and \ tname not in include_names: continue tb = Table( tname, MetaData(), Column('x', Integer), Column('y', Integer), schema=schema ) conn.execute(DropConstraint( ForeignKeyConstraint( [tb.c.x], [tb.c.y], name=fkc) ))
def preapre_tables(pg): tables = [db.question, db.choice] async with pg.acquire() as conn: for table in reversed(tables): drop_expr = DropTable(table) try: await conn.execute(drop_expr) except psycopg2.ProgrammingError: pass async with pg.acquire() as conn: for table in tables: create_expr = CreateTable(table) await conn.execute(create_expr)
def delete_tables(pg, tables): async with pg.acquire() as conn: for table in reversed(tables): drop_expr = DropTable(table) try: await conn.execute(drop_expr) except psycopg2.ProgrammingError: pass
def _prep_testing_database(options, file_config): from alembic.testing import config from alembic.testing.exclusions import against from sqlalchemy import schema from alembic import util if util.sqla_08: from sqlalchemy import inspect else: from sqlalchemy.engine.reflection import Inspector inspect = Inspector.from_engine if options.dropfirst: for cfg in config.Config.all_configs(): e = cfg.db inspector = inspect(e) try: view_names = inspector.get_view_names() except NotImplementedError: pass else: for vname in view_names: e.execute(schema._DropView( schema.Table(vname, schema.MetaData()) )) if config.requirements.schemas.enabled_for_config(cfg): try: view_names = inspector.get_view_names( schema="test_schema") except NotImplementedError: pass else: for vname in view_names: e.execute(schema._DropView( schema.Table(vname, schema.MetaData(), schema="test_schema") )) for tname in reversed(inspector.get_table_names( order_by="foreign_key")): e.execute(schema.DropTable( schema.Table(tname, schema.MetaData()) )) if config.requirements.schemas.enabled_for_config(cfg): for tname in reversed(inspector.get_table_names( order_by="foreign_key", schema="test_schema")): e.execute(schema.DropTable( schema.Table(tname, schema.MetaData(), schema="test_schema") )) if against(cfg, "postgresql") and util.sqla_100: from sqlalchemy.dialects import postgresql for enum in inspector.get_enums("*"): e.execute(postgresql.DropEnumType( postgresql.ENUM( name=enum['name'], schema=enum['schema'])))
def _prep_testing_database(options, file_config): from sqlalchemy.testing import config from sqlalchemy import schema, inspect if options.dropfirst: for cfg in config.Config.all_configs(): e = cfg.db inspector = inspect(e) try: view_names = inspector.get_view_names() except NotImplementedError: pass else: for vname in view_names: e.execute(schema._DropView( schema.Table(vname, schema.MetaData()) )) if config.requirements.schemas.enabled_for_config(cfg): try: view_names = inspector.get_view_names( schema="test_schema") except NotImplementedError: pass else: for vname in view_names: e.execute(schema._DropView( schema.Table(vname, schema.MetaData(), schema="test_schema") )) for tname in reversed(inspector.get_table_names( order_by="foreign_key")): e.execute(schema.DropTable( schema.Table(tname, schema.MetaData()) )) if config.requirements.schemas.enabled_for_config(cfg): for tname in reversed(inspector.get_table_names( order_by="foreign_key", schema="test_schema")): e.execute(schema.DropTable( schema.Table(tname, schema.MetaData(), schema="test_schema") ))
def drop_everything(echo=True): """ Pre-gather all named constraints and table names, and drop everything. This is better than using metadata.reflect(); metadata.drop_all() as it handles cyclical constraints between tables. Ref. http://www.sqlalchemy.org/trac/wiki/UsageRecipes/DropEverything """ engine = session.get_engine(echo=echo) conn = engine.connect() # the transaction only applies if the DB supports # transactional DDL, i.e. Postgresql, MS SQL Server trans = conn.begin() inspector = reflection.Inspector.from_engine(engine) # gather all data first before dropping anything. # some DBs lock after things have been dropped in # a transaction. metadata = MetaData() tbs = [] all_fks = [] for table_name in inspector.get_table_names(): fks = [] for fk in inspector.get_foreign_keys(table_name): if not fk['name']: continue fks.append(ForeignKeyConstraint((), (), name=fk['name'])) t = Table(table_name, metadata, *fks) tbs.append(t) all_fks.extend(fks) for fkc in all_fks: try: print str(DropConstraint(fkc)) + ';' conn.execute(DropConstraint(fkc)) except: print format_exc() for table in tbs: try: print str(DropTable(table)).strip() + ';' conn.execute(DropTable(table)) except: print format_exc() trans.commit()