我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.connection.schema_editor()。
def test_demo_schemata_get_migrated(self): user = User.objects.create_user(**CREDENTIALS) schema = DemoSchema.objects.create(user=user, from_template=self.template) operation = migrations.CreateModel("Pony", [ ('pony_id', models.AutoField(primary_key=True)), ('pink', models.IntegerField(default=1)), ]) project_state = ProjectState() new_state = project_state.clone() operation.state_forwards('tests', new_state) schema.activate() self.assertFalse('tests_pony' in get_table_list()) with connection.schema_editor() as editor: operation.database_forwards('tests', editor, project_state, new_state) schema.activate() self.assertTrue('tests_pony' in get_table_list()) with connection.schema_editor() as editor: operation.database_backwards('tests', editor, new_state, project_state) schema.activate() self.assertFalse('tests_pony' in get_table_list())
def test_unique(self): num_constraints_before = len(self.get_constraints(RoomBooking)) # Add the index index_name = 'roombooking_test_idx' index = PartialIndex(fields=['user', 'room'], name=index_name, unique=True, where='deleted_at IS NULL') with self.schema_editor() as editor: editor.add_index(RoomBooking, index) constraints = self.get_constraints(RoomBooking) self.assertEqual(len(constraints), num_constraints_before + 1) self.assertEqual(constraints[index_name]['columns'], ['user_id', 'room_id']) self.assertEqual(constraints[index_name]['primary_key'], False) self.assertEqual(constraints[index_name]['check'], False) self.assertEqual(constraints[index_name]['index'], True) self.assertEqual(constraints[index_name]['unique'], True) # Drop the index with self.schema_editor() as editor: editor.remove_index(RoomBooking, index) constraints = self.get_constraints(RoomBooking) self.assertEqual(len(constraints), num_constraints_before) self.assertNotIn(index_name, constraints)
def test_not_unique(self): num_constraints_before = len(self.get_constraints(Job)) # Add the index index_name = 'job_test_idx' index = PartialIndex(fields=['-group'], name=index_name, unique=False, where_postgresql='is_complete = false', where_sqlite='is_complete = 0') with self.schema_editor() as editor: editor.add_index(Job, index) constraints = self.get_constraints(Job) self.assertEqual(len(constraints), num_constraints_before + 1) self.assertEqual(constraints[index_name]['columns'], ['group']) self.assertEqual(constraints[index_name]['orders'], ['DESC']) self.assertEqual(constraints[index_name]['primary_key'], False) self.assertEqual(constraints[index_name]['check'], False) self.assertEqual(constraints[index_name]['index'], True) self.assertEqual(constraints[index_name]['unique'], False) # Drop the index with self.schema_editor() as editor: editor.remove_index(Job, index) constraints = self.get_constraints(Job) self.assertEqual(len(constraints), num_constraints_before) self.assertNotIn(index_name, constraints)
def ready(self): """ This supports two use cases for using tsvector_field: 1. Configure your Django project to use tsvecotr_field's DatabaseSchemaEditor directly by creating your own DatabaseWrapper and referencing tsvector_field.DatabaseSchemaEditor in the SchemaEditorClass attribute. See: tsvector_field/schema.py for more info. 2. Just add `tsvector_field` to your project's INSTALLED_APPS setting and this will use the `pre_migrate` mechanism. Note: `pre_migrate` is not fired for ./manage.py migrate --run-syncdb. So if you are building apps without migrations you will have to use the more reliable approach in option #1. """ from django.db import connection from . import DatabaseSchemaEditor if not isinstance(connection.schema_editor(), DatabaseSchemaEditor): # only register for pre_migrate if we're not already configured # with the DatabaseSchemaEditor, see option #1 in doc above from django.db.models.signals import pre_migrate from .receivers import inject_trigger_operations pre_migrate.connect(inject_trigger_operations)
def migrate(self, ops, state=None): class Migration(migrations.Migration): operations = ops migration = Migration('name', 'tests') inject_trigger_operations([(migration, False)]) with connection.schema_editor() as schema_editor: return migration.apply(state or ProjectState.from_apps(self.apps), schema_editor)
def test_add_check_constraint(self): project_state = self.set_up_test_model() operation = migrations.AlterField( model_name='pony', name='pink', field=models.PositiveIntegerField(default=3) ) new_state = project_state.clone() operation.state_forwards('tests', new_state) self.assertNoConstraint('tests_pony', ['pink'], 'check') with connection.schema_editor() as editor: operation.database_forwards('tests', editor, project_state, new_state) self.assertConstraint('tests_pony', ['pink'], 'check') with connection.schema_editor() as editor: operation.database_backwards('tests', editor, new_state, project_state) self.assertNoConstraint('tests_pony', ['pink'], 'check')
def test_add_unique_constraint(self): project_state = self.set_up_test_model() operation = migrations.AlterField( model_name='pony', name='pink', field=models.IntegerField(unique=True, default=3) ) new_state = project_state.clone() operation.state_forwards('tests', new_state) self.assertNoConstraint('tests_pony', ['pink'], 'unique') with connection.schema_editor() as editor: operation.database_forwards('tests', editor, project_state, new_state) self.assertConstraint('tests_pony', ['pink'], 'unique') with connection.schema_editor() as editor: operation.database_backwards('tests', editor, new_state, project_state) self.assertNoConstraint('tests_pony', ['pink'], 'unique')
def test_custom_migration_operation(self): project_state = self.set_up_test_model() operation = AddField( app_label='tests', model_name='pony', name='yellow', field=models.BooleanField(default=True) ) new_state = project_state.clone() operation.state_forwards('tests', new_state) self.assertColumnNotExists('tests_pony', 'yellow') with connection.schema_editor() as editor: operation.database_forwards('tests', editor, project_state, new_state) self.assertColumnExists('tests_pony', 'yellow') with connection.schema_editor() as editor: operation.database_backwards('tests', editor, new_state, project_state) self.assertColumnNotExists('tests_pony', 'yellow')
def test_migrations_apply_to_templates(self): template = SchemaTemplate.objects.create(name='a') operation = migrations.CreateModel("Pony", [ ('pony_id', models.AutoField(primary_key=True)), ('pink', models.IntegerField(default=1)), ]) project_state = ProjectState() new_state = project_state.clone() operation.state_forwards('tests', new_state) template.activate() self.assertFalse('tests_pony' in get_table_list()) with connection.schema_editor() as editor: operation.database_forwards('tests', editor, project_state, new_state) template.activate() self.assertTrue('tests_pony' in get_table_list()) with connection.schema_editor() as editor: operation.database_backwards('tests', editor, new_state, project_state) template.activate() self.assertFalse('tests_pony' in get_table_list())
def filtered_schema_editor(*filters: List[str]): """Gets a schema editor, but filters executed SQL statements based on the specified text filters. Arguments: filters: List of strings to filter SQL statements on. """ with connection.schema_editor() as schema_editor: wrapper_for = schema_editor.execute with mock.patch.object(BaseDatabaseSchemaEditor, 'execute', wraps=wrapper_for) as execute: filter_results = {} yield schema_editor, filter_results for filter_text in filters: filter_results[filter_text] = [ call for call in execute.mock_calls if filter_text in str(call) ]
def create_drop_model(field, filters: List[str]): """Creates and drops a model with the specified field. Arguments: field: The field to include on the model to create and drop. filters: List of strings to filter SQL statements on. """ model = define_fake_model({'title': field}) with filtered_schema_editor(*filters) as (schema_editor, calls): execute_migration(schema_editor, [ migrations.CreateModel( model.__name__, fields=[ ('title', field.clone()) ] ), migrations.DeleteModel( model.__name__, ) ]) yield calls
def migrate(self, *filters: List[str]): """ Executes the recorded migrations. Arguments: filters: List of strings to filter SQL statements on. Returns: The filtered calls of every migration """ calls_for_migrations = [] while len(self.migrations) > 0: migration = self.migrations.pop() with filtered_schema_editor(*filters) as (schema_editor, calls): migration_executor = MigrationExecutor(schema_editor.connection) migration_executor.apply_migration( self.project_state, migration ) calls_for_migrations.append(calls) return calls_for_migrations
def get_fake_model(fields=None, model_base=PostgresModel, meta_options={}): """Creates a fake model to use during unit tests.""" model = define_fake_model(fields, model_base, meta_options) class TestProject: def clone(self, *_args, **_kwargs): return self @property def apps(self): return self class TestMigration(migrations.Migration): operations = [HStoreExtension()] with connection.schema_editor() as schema_editor: migration_executor = MigrationExecutor(schema_editor.connection) migration_executor.apply_migration( TestProject(), TestMigration('eh', 'postgres_extra')) schema_editor.create_model(model) return model
def get_fake_model(fields=None, model_base=LocalizedModel, meta_options={}): """Creates a fake model to use during unit tests.""" model = define_fake_model(fields, model_base, meta_options) class TestProject: def clone(self, *_args, **_kwargs): return self @property def apps(self): return self class TestMigration(migrations.Migration): operations = [HStoreExtension()] with connection.schema_editor() as schema_editor: migration_executor = MigrationExecutor(schema_editor.connection) migration_executor.apply_migration( TestProject(), TestMigration('eh', 'postgres_extra')) schema_editor.create_model(model) return model
def __enter__(self): ''' Create the tables in our database ''' with connection.schema_editor() as editor: for Model in self.models: editor.create_model(Model)
def __exit__(self, *args): ''' Remove the tables from our database ''' with connection.schema_editor() as editor: for Model in reversed(self.models): editor.delete_model(Model)
def add_constraints_and_indexes(self): """ Re-create constraints and indexes on the model and its fields. """ with connection.schema_editor() as schema_editor: schema_editor.alter_unique_together( self.model, (), self.model._meta.unique_together, ) schema_editor.alter_index_together( self.model, (), self.model._meta.index_together, ) for field in self.model.objects.constrained_fields: field_copy = field.__copy__() field_copy.db_constraint = False schema_editor.alter_field( self.model, field_copy, field ) for field in self.model.objects.indexed_fields: field_copy = field.__copy__() field_copy.db_index = False schema_editor.alter_field( self.model, field_copy, field )
def drop_constraints_and_indexes(self): """ Temporarily drop constraints and indexes on the model and its fields. """ with connection.schema_editor() as schema_editor: schema_editor.alter_unique_together( self.model, self.model._meta.unique_together, (), ) schema_editor.alter_index_together( self.model, self.model._meta.index_together, (), ) for field in self.model.objects.constrained_fields: field_copy = field.__copy__() field_copy.db_constraint = False schema_editor.alter_field( self.model, field, field_copy ) for field in self.model.objects.indexed_fields: field_copy = field.__copy__() field_copy.db_index = False schema_editor.alter_field( self.model, field, field_copy )
def schema_editor(self): # collect_sql=True -> do not actually execute. return connection.schema_editor(collect_sql=True)
def test_roombooking_createsql(self): with self.schema_editor() as editor: sql = RoomBooking._meta.indexes[0].create_sql(RoomBooking, editor) self.assertRegex(sql, ROOMBOOKING_SQL)
def test_roombooking_create_model(self): with self.schema_editor() as editor: editor.create_model(RoomBooking) self.assertContainsMatch(editor.collected_sql, ROOMBOOKING_SQL)
def test_job_createsql(self): with self.schema_editor() as editor: sql = Job._meta.indexes[0].create_sql(Job, editor) self.assertRegex(sql, JOB_NONUNIQUE_SQL % self.false(editor))
def test_job_create_model(self): with self.schema_editor() as editor: editor.create_model(Job) f = self.false(editor) self.assertContainsMatch(editor.collected_sql, JOB_NONUNIQUE_SQL % f) self.assertContainsMatch(editor.collected_sql, JOB_UNIQUE_SQL % f)
def trigger_editor(self): with DatabaseSchemaEditor(connection) as schema_editor: return DatabaseTriggerEditor(schema_editor)
def test_create_model_no_function(self): class NoWeightedColumns(models.Model): search = SearchVectorField() with DatabaseSchemaEditor(connection) as schema_editor: schema_editor.create_model(NoWeightedColumns) self.assertEqual(len(schema_editor.deferred_sql), 1) self.assertIn('CREATE INDEX', schema_editor.deferred_sql[0]) self.assertIn('GIN', schema_editor.deferred_sql[0])
def test_create_model(self): class TextDocument(models.Model): title = models.CharField(max_length=128) search = SearchVectorField([ WeightedColumn('title', 'A'), ], 'english') with DatabaseSchemaEditor(connection) as schema_editor: schema_editor.create_model(TextDocument) self.assertEqual(len(schema_editor.deferred_sql), 3) self.assertIn('CREATE INDEX', schema_editor.deferred_sql[0]) self.assertIn('GIN', schema_editor.deferred_sql[0]) self.assertIn('CREATE FUNCTION', schema_editor.deferred_sql[1]) self.assertIn('CREATE TRIGGER', schema_editor.deferred_sql[2])
def apply_operations(self, app_label, project_state, operations): migration = Migration('name', app_label) migration.operations = operations with connection.schema_editor() as editor: return migration.apply(project_state, editor)
def unapply_operations(self, app_label, project_state, operations): migration = Migration('name', app_label) migration.operations = operations with connection.schema_editor() as editor: return migration.unapply(project_state, editor)
def test_create_model(self): operation = migrations.CreateModel("Pony", [ ('pony_id', models.AutoField(primary_key=True)), ('pink', models.IntegerField(default=1)), ]) project_state = ProjectState() new_state = project_state.clone() operation.state_forwards('tests', new_state) self.assertTableNotExists('tests_pony') with connection.schema_editor() as editor: operation.database_forwards('tests', editor, project_state, new_state) self.assertTableExists('tests_pony') with connection.schema_editor() as editor: operation.database_backwards('tests', editor, new_state, project_state) self.assertTableNotExists('tests_pony')
def test_delete_model(self): project_state = self.set_up_test_model() operation = migrations.DeleteModel("Pony") new_state = project_state.clone() operation.state_forwards('tests', new_state) self.assertTableExists('tests_pony') with connection.schema_editor() as editor: operation.database_forwards('tests', editor, project_state, new_state) self.assertTableNotExists('tests_pony') with connection.schema_editor() as editor: operation.database_backwards('tests', editor, new_state, project_state) self.assertTableExists('tests_pony')
def test_rename_model(self): project_state = self.set_up_test_model() operation = migrations.RenameModel("Pony", "Horse") new_state = project_state.clone() operation.state_forwards('tests', new_state) self.assertTableExists('tests_pony') self.assertTableNotExists('tests_horse') with connection.schema_editor() as editor: operation.database_forwards('tests', editor, project_state, new_state) self.assertTableExists('tests_horse') self.assertTableNotExists('tests_pony') with connection.schema_editor() as editor: operation.database_backwards('tests', editor, new_state, project_state) self.assertTableExists('tests_pony') self.assertTableNotExists('tests_horse')
def test_remove_field(self): project_state = self.set_up_test_model() operation = migrations.RemoveField('Pony', 'pink') new_state = project_state.clone() operation.state_forwards('tests', new_state) self.assertColumnExists('tests_pony', 'pink') with connection.schema_editor() as editor: operation.database_forwards('tests', editor, project_state, new_state) self.assertColumnNotExists('tests_pony', 'pink') with connection.schema_editor() as editor: operation.database_backwards('tests', editor, new_state, project_state) self.assertColumnExists('tests_pony', 'pink')
def test_remove_foreign_key_field(self): project_state = self.set_up_test_model() operation = migrations.RemoveField('Rider', 'pony') new_state = project_state.clone() operation.state_forwards('tests', new_state) self.assertColumnExists('tests_rider', 'pony_id') with connection.schema_editor() as editor: operation.database_forwards('tests', editor, project_state, new_state) self.assertColumnNotExists('tests_rider', 'pony_id') with connection.schema_editor() as editor: operation.database_backwards('tests', editor, new_state, project_state) self.assertColumnExists('tests_rider', 'pony_id')
def test_alter_model_table(self): project_state = self.set_up_test_model() operation = migrations.AlterModelTable('Pony', 'tests_pony_2') new_state = project_state.clone() operation.state_forwards('tests', new_state) self.assertTableExists('tests_pony') self.assertTableNotExists('tests_pony_2') with connection.schema_editor() as editor: operation.database_forwards('tests', editor, project_state, new_state) self.assertTableExists('tests_pony_2') self.assertTableNotExists('tests_pony') with connection.schema_editor() as editor: operation.database_backwards('tests', editor, new_state, project_state) self.assertTableExists('tests_pony') self.assertTableNotExists('tests_pony_2')
def test_alter_field(self): project_state = self.set_up_test_model() operation = migrations.AlterField('Pony', 'pink', models.IntegerField(null=True)) new_state = project_state.clone() operation.state_forwards('tests', new_state) self.assertColumnNotNull('tests_pony', 'pink') with connection.schema_editor() as editor: operation.database_forwards('tests', editor, project_state, new_state) self.assertColumnNull('tests_pony', 'pink') with connection.schema_editor() as editor: operation.database_backwards('tests', editor, new_state, project_state) self.assertColumnNotNull('tests_pony', 'pink')
def test_rename_field(self): project_state = self.set_up_test_model() operation = migrations.RenameField('Pony', 'pink', 'rosa') new_state = project_state.clone() operation.state_forwards('tests', new_state) self.assertColumnExists('tests_pony', 'pink') self.assertColumnNotExists('tests_pony', 'rosa') with connection.schema_editor() as editor: operation.database_forwards('tests', editor, project_state, new_state) self.assertColumnNotExists('tests_pony', 'pink') self.assertColumnExists('tests_pony', 'rosa') with connection.schema_editor() as editor: operation.database_backwards('tests', editor, new_state, project_state) self.assertColumnExists('tests_pony', 'pink') self.assertColumnNotExists('tests_pony', 'rosa')
def test_alter_index_together(self): project_state = self.set_up_test_model() operation = migrations.AlterIndexTogether('Pony', [('pink', 'weight')]) new_state = project_state.clone() operation.state_forwards('tests', new_state) self.assertIndexNotExists('tests_pony', ['pink', 'weight']) with connection.schema_editor() as editor: operation.database_forwards('tests', editor, project_state, new_state) self.assertIndexExists('tests_pony', ['pink', 'weight']) with connection.schema_editor() as editor: operation.database_backwards('tests', editor, new_state, project_state) self.assertIndexNotExists('tests_pony', ['pink', 'weight'])
def test_alter_order_with_respect_to(self): project_state = self.set_up_test_model() operation = migrations.AlterOrderWithRespectTo('Rider', 'pony') new_state = project_state.clone() operation.state_forwards('tests', new_state) self.assertColumnNotExists('tests_rider', '_order') with connection.schema_editor() as editor: operation.database_forwards('tests', editor, project_state, new_state) self.assertColumnExists('tests_rider', '_order') with connection.schema_editor() as editor: operation.database_backwards('tests', editor, new_state, project_state) self.assertColumnNotExists('tests_rider', '_order')
def test_constraint_name_method(self): from ..models import AwareModel, NaiveModel, SelfReferentialModel with connection.schema_editor() as editor: self.assertEqual(3, len(editor._constraint_names(AwareModel, index=True))) six.assertCountEqual( self, ['tests_awaremodel_pkey'], editor._constraint_names(AwareModel, primary_key=True) ) six.assertCountEqual(self, [ 'tests_awaremodel_pkey', 'tests_awaremodel_name_key' ], editor._constraint_names(AwareModel, unique=True)) six.assertCountEqual(self, [ 'tests_awaremodel_name_key' ], editor._constraint_names(AwareModel, unique=True, primary_key=False)) six.assertCountEqual(self, ['tests_awaremodel_pkey'], editor._constraint_names(AwareModel, primary_key=True, unique=True)) six.assertCountEqual(self, [], editor._constraint_names(AwareModel, foreign_key=True)) six.assertCountEqual(self, [], editor._constraint_names(AwareModel, foreign_key=True, primary_key=True)) six.assertCountEqual(self, ['tests_awaremodel_factor_check'], editor._constraint_names(AwareModel, check=True)) with connection.schema_editor() as editor: six.assertCountEqual(self, ['tests_naivemodel_pkey'], editor._constraint_names(NaiveModel, primary_key=True)) six.assertCountEqual(self, ['tests_naivemodel_name_key'], editor._constraint_names(NaiveModel, unique=True, primary_key=False)) with connection.schema_editor() as editor: # These constraint names appear to change between different versions of django or python? self.assertEqual(1, len(editor._constraint_names(SelfReferentialModel, foreign_key=True)))
def test_0002_patch_admin_reverse(self): Schema.objects.mass_create('a', 'b', 'c') remove_all_schemata = import_module('boardinghouse.migrations.0002_patch_admin').remove_all_schemata remove_all_schemata(apps, connection.schema_editor()) self.assertEqual(0, Schema.objects.count())
def test_0004_change_sequence_owners(self): Schema.objects.mass_create('a', 'b', 'c') module = import_module('boardinghouse.migrations.0004_change_sequence_owners') module.change_existing_sequence_owners(apps, connection.schema_editor()) # How can I assert that this was executed, and did what it says on the box? module.noop(apps, connection.schema_editor())
def test_clone_schema_with_trigger(self): TRIGGER_FUNCTION = '''CREATE OR REPLACE FUNCTION trigger_this() RETURNS TRIGGER AS $$ BEGIN RAISE EXCEPTION 'Trigger fired correctly'; END; $$ LANGUAGE plpgsql''' TRIGGER_TRIGGER = '''CREATE TRIGGER "test_trigger_this" BEFORE INSERT ON tests_awaremodel FOR EACH STATEMENT EXECUTE PROCEDURE trigger_this()''' project_state = ProjectState() new_state = project_state.clone() trigger_function_op = migrations.RunSQL( sql=TRIGGER_FUNCTION, reverse_sql='DROP FUNCTION trigger_this()' ) trigger_op = migrations.RunSQL( sql=TRIGGER_TRIGGER, reverse_sql='DROP TRIGGER "test_trigger_this" BEFORE EACH UPDATE ON tests_awaremodel' ) trigger_function_op.state_forwards('tests', new_state) trigger_op.state_forwards('tests', new_state) with connection.schema_editor() as editor: trigger_function_op.database_forwards('tests', editor, project_state, new_state) trigger_op.database_forwards('tests', editor, project_state, new_state) Schema.objects.mass_create('a', 'b', 'c') for schema in Schema.objects.all(): schema.activate() with transaction.atomic(): with self.assertRaises(Exception) as exc: self.assertTrue('Trigger fired correctly' == exc.args[0]) AwareModel.objects.create(name='FAILCREATE')
def execute_migration(schema_editor, operations, project=None): """Executes the specified migration operations using the specified schema editor. Arguments: schema_editor: The schema editor to use to execute the migrations. operations: The migration operations to execute. project: The project state to use during the migrations. """ project = project or migrations.state.ProjectState.from_apps(apps) class Migration(migrations.Migration): pass Migration.operations = operations executor = MigrationExecutor(schema_editor.connection) executor.apply_migration( project, Migration('eh', 'postgres_extra'))
def alter_db_table(field, filters: List[str]): """Creates a model with the specified field and then renames the database table. Arguments: field: The field to include into the model. filters: List of strings to filter SQL statements on. """ model = define_fake_model() project = migrations.state.ProjectState.from_apps(apps) with connection.schema_editor() as schema_editor: execute_migration(schema_editor, [ migrations.CreateModel( model.__name__, fields=[ ('title', field.clone()) ] ) ], project) with filtered_schema_editor(*filters) as (schema_editor, calls): execute_migration(schema_editor, [ migrations.AlterModelTable( model.__name__, 'NewTableName' ) ], project) yield calls
def add_field(field, filters: List[str]): """Adds the specified field to a model. Arguments: field: The field to add to a model. filters: List of strings to filter SQL statements on. """ model = define_fake_model() project = migrations.state.ProjectState.from_apps(apps) with connection.schema_editor() as schema_editor: execute_migration(schema_editor, [ migrations.CreateModel( model.__name__, fields=[] ) ], project) with filtered_schema_editor(*filters) as (schema_editor, calls): execute_migration(schema_editor, [ migrations.AddField( model.__name__, 'title', field ) ], project) yield calls
def alter_field(old_field, new_field, filters: List[str]): """Alters a field from one state to the other. Arguments: old_field: The field before altering it. new_field: The field after altering it. filters: List of strings to filter SQL statements on. """ model = define_fake_model({'title': old_field}) project = migrations.state.ProjectState.from_apps(apps) with connection.schema_editor() as schema_editor: execute_migration(schema_editor, [ migrations.CreateModel( model.__name__, fields=[ ('title', old_field.clone()) ] ) ], project) with filtered_schema_editor(*filters) as (schema_editor, calls): execute_migration(schema_editor, [ migrations.AlterField( model.__name__, 'title', new_field ) ], project) yield calls
def rename_field(field, filters: List[str]): """Renames a field from one name to the other. Arguments: field: Field to be renamed. filters: List of strings to filter SQL statements on. """ model = define_fake_model({'title': field}) project = migrations.state.ProjectState.from_apps(apps) with connection.schema_editor() as schema_editor: execute_migration(schema_editor, [ migrations.CreateModel( model.__name__, fields=[ ('title', field.clone()) ] ) ], project) with filtered_schema_editor(*filters) as (schema_editor, calls): execute_migration(schema_editor, [ migrations.RenameField( model.__name__, 'title', 'newtitle' ) ], project) yield calls
def test_run_sql(self): project_state = self.set_up_test_model() operation = migrations.RunSQL( """CREATE TABLE i_love_ponies (id int, special_thing int); CREATE INDEX i_love_ponies_special_idx ON i_love_ponies (special_thing); INSERT INTO i_love_ponies (id, special_thing) VALUES (1, 42); INSERT INTO i_love_ponies (id, special_thing) VALUES (2, 51), (3, 60); DELETE FROM i_love_ponies WHERE special_thing = 42; UPDATE i_love_ponies SET special_thing = 42 WHERE id = 2;""", "DROP TABLE i_love_ponies") new_state = project_state.clone() operation.state_forwards('tests', new_state) self.assertTableNotExists('i_love_ponies') with connection.schema_editor() as editor: operation.database_forwards('tests', editor, project_state, new_state) cursor = connection.cursor() cursor.execute('SELECT * FROM a.i_love_ponies') self.assertTableExists('i_love_ponies') self.assertIndexExists('i_love_ponies', ['special_thing']) @all_schemata def objects_exist(cursor, **kwargs): cursor = connection.cursor() cursor.execute('SELECT * FROM i_love_ponies ORDER BY id') result = cursor.fetchmany(4) self.assertTrue(result, 'No objects found in {schema}'.format(**kwargs)) expected = [(2, 42), (3, 60)] self.assertEqual( sorted(expected), sorted(result), 'Mismatch objects found in schema {schema}: expected {0}, saw {1}' .format(expected, result, **kwargs) ) with connection.cursor() as cursor: objects_exist(cursor) with connection.schema_editor() as editor: operation.database_backwards('tests', editor, new_state, project_state) self.assertTableNotExists('i_love_ponies')
def test_run_python(self): """ Because this can run arbitrary python code, we can't know which parts of it need to run against each schema, and which parts run against the public schema. We could hack into any generated SQL, and inspect it, looking for table names, attempting to push data to the correct schemata (including executing the SQL multiple times if necessary). Maybe we could fuck with the models generated by project_state.render(), and make their generated SQL do what we need it to do. Although, it looks like Pony.objects is a normal models.Manager class. """ project_state = self.set_up_test_model() def forwards(models, schema_editor): Pony = models.get_model('tests', 'Pony') Pony.objects.create(pink=1, weight=3.55) Pony.objects.create(weight=5) def backwards(models, schema_editor): Pony = models.get_model('tests', 'Pony') Pony.objects.filter(pink=1, weight=3.55).delete() Pony.objects.filter(weight=5).delete() operation = migrations.RunPython(forwards, reverse_code=backwards) new_state = project_state.clone() operation.state_forwards('tests', new_state) Pony = project_state.apps.get_model('tests', 'Pony') @all_schemata def pony_count(count, **kwargs): found = Pony.objects.count() self.assertEqual( count, found, 'Incorrect number of Ponies found in schema ' '{schema}: expected {0}, found {1}'.format(count, found, **kwargs) ) pony_count(0) with connection.schema_editor() as editor: operation.database_forwards('tests', editor, project_state, new_state) pony_count(2) with connection.schema_editor() as editor: operation.database_backwards('tests', editor, new_state, project_state) pony_count(0)