我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.apps.apps()。
def inject_pre_migration_operations(plan=None, apps=global_apps, using=DEFAULT_DB_ALIAS, **kwargs): """ Insert a `LTreeExtension` operation before every planned `CreateModel` operation. """ if plan is None: return for migration, backward in plan: for index, operation in enumerate(migration.operations): if isinstance(operation, migrations.CreateModel): for name, field in operation.fields: if isinstance(field, LTreeField): migration.operations.insert(index, LTreeExtension()) return
def inject_post_migration_operations(plan=None, apps=global_apps, using=DEFAULT_DB_ALIAS, **kwargs): if plan is None: return for migration, backward in plan: for index, operation in reversed(list(enumerate(migration.operations))): if isinstance(operation, migrations.CreateModel): model = apps.get_model(migration.app_label, operation.name) post_migrate_mpathnode(model)
def test_for_missing_migrations(self): """Checks if there're models changes which aren't reflected in migrations.""" current_models_state = ProjectState.from_apps(apps) # skip test models current_models_state.remove_model('onfido', 'testbasemodel') current_models_state.remove_model('onfido', 'testbasestatusmodel') migrations_loader = MigrationExecutor(connection).loader migrations_detector = MigrationAutodetector( from_state=migrations_loader.project_state(), to_state=current_models_state ) # import pdb; pdb.set_trace() if migrations_detector.changes(graph=migrations_loader.graph): self.fail( 'Your models have changes that are not yet reflected ' 'in a migration. You should add them now.' )
def handle(self, *args, **options): changed = set() self.stdout.write("Checking...") for db in settings.DATABASES.keys(): try: executor = MigrationExecutor(connections[db]) except OperationalError: sys.exit("Unable to check migrations: cannot connect to database\n") autodetector = MigrationAutodetector( executor.loader.project_state(), ProjectState.from_apps(apps), ) changed.update(autodetector.changes(graph=executor.loader.graph).keys()) changed -= set(options['ignore']) if changed: sys.exit( "Apps with model changes but no corresponding migration file: %(changed)s\n" % { 'changed': list(changed) }) else: sys.stdout.write("All migration files present\n")
def update_permissions(sender, app_config, verbosity, apps=global_apps, **kwargs): settings_models = getattr(settings, 'ADMIN_VIEW_PERMISSION_MODELS', None) # TODO: Maybe look at the registry not in all models for app in apps.get_app_configs(): for model in app.get_models(): view_permission = 'view_%s' % model._meta.model_name if settings_models or (settings_models is not None and len( settings_models) == 0): model_name = get_model_name(model) if model_name in settings_models and view_permission not in \ [perm[0] for perm in model._meta.permissions]: model._meta.permissions += ( (view_permission, 'Can view %s' % model._meta.model_name),) else: if view_permission not in [perm[0] for perm in model._meta.permissions]: model._meta.permissions += ( ('view_%s' % model._meta.model_name, 'Can view %s' % model._meta.model_name),)
def test_for_missing_migrations(self): """Checks if there're models changes which aren't reflected in migrations.""" current_models_state = ProjectState.from_apps(apps) # skip tracking changes for TestModel current_models_state.remove_model('elasticsearch_django', 'testmodel') migrations_loader = MigrationExecutor(connection).loader migrations_detector = MigrationAutodetector( from_state=migrations_loader.project_state(), to_state=current_models_state ) if migrations_detector.changes(graph=migrations_loader.graph): self.fail( 'Your models have changes that are not yet reflected ' 'in a migration. You should add them now.' )
def __init__(self): """Creates a new migration simulator with an empty project state and no migrations.""" import psqlextra.apps self.app_label = self._generate_random_name() self.app_config = type(self.app_label, (AppConfig,), dict( name=self.app_label, verbose_name=self.app_label ))(self.app_label, psqlextra.apps) self.app_config.models = {} self.apps = Apps() self.apps.ready = False self.apps.populate(installed_apps=[self.app_config]) self.project_state = ProjectState() self.migrations = []
def make_migrations(self): """Runs the auto-detector and detects changes in the project that can be put in a migration.""" new_project_state = ProjectState.from_apps(self.apps) autodetector = MigrationAutodetector( self.project_state, new_project_state ) changes = autodetector._detect_changes() migrations = changes.get('tests', []) migration = migrations[0] if len(migrations) > 0 else None self.migrations.append(migration) self.project_state = new_project_state return migration
def check_migrations(): from django.db.migrations.autodetector import MigrationAutodetector from django.db.migrations.executor import MigrationExecutor from django.db.migrations.state import ProjectState changed = set() print("Checking {} migrations...".format(APP_NAME)) for db in settings.DATABASES.keys(): try: executor = MigrationExecutor(connections[db]) except OperationalError as e: sys.exit( "Unable to check migrations due to database: {}".format(e) ) autodetector = MigrationAutodetector( executor.loader.project_state(), ProjectState.from_apps(apps), ) changed.update( autodetector.changes(graph=executor.loader.graph).keys() ) if changed and APP_NAME in changed: sys.exit( "A migration file is missing. Please run " "`python makemigrations.py` to generate it." ) else: print("All migration files present.")
def create_default_site(app_config, verbosity=2, interactive=True, using=DEFAULT_DB_ALIAS, apps=global_apps, **kwargs): try: Site = apps.get_model('sites', 'Site') except LookupError: return if not router.allow_migrate_model(using, Site): return if not Site.objects.using(using).exists(): # The default settings set SITE_ID = 1, and some tests in Django's test # suite rely on this value. However, if database sequences are reused # (e.g. in the test suite after flush/syncdb), it isn't guaranteed that # the next id will be 1, so we coerce it. See #15573 and #16353. This # can also crop up outside of tests - see #15346. if verbosity >= 2: print("Creating example.com Site object") Site(pk=getattr(settings, 'SITE_ID', 1), domain="example.com", name="example.com").save(using=using) # We set an explicit pk instead of relying on auto-incrementation, # so we need to reset the database sequence. See #17415. sequence_sql = connections[using].ops.sequence_reset_sql(no_style(), [Site]) if sequence_sql: if verbosity >= 2: print("Resetting sequence") with connections[using].cursor() as cursor: for command in sequence_sql: cursor.execute(command)
def inject_rename_contenttypes_operations(plan=None, apps=global_apps, using=DEFAULT_DB_ALIAS, **kwargs): """ Insert a `RenameContentType` operation after every planned `RenameModel` operation. """ if plan is None: return # Determine whether or not the ContentType model is available. try: ContentType = apps.get_model('contenttypes', 'ContentType') except LookupError: available = False else: if not router.allow_migrate_model(using, ContentType): return available = True for migration, backward in plan: if ((migration.app_label, migration.name) == ('contenttypes', '0001_initial')): # There's no point in going forward if the initial contenttypes # migration is unapplied as the ContentType model will be # unavailable from this point. if backward: break else: available = True continue # The ContentType model is not available yet. if not available: continue inserts = [] for index, operation in enumerate(migration.operations): if isinstance(operation, migrations.RenameModel): operation = RenameContentType( migration.app_label, operation.old_name_lower, operation.new_name_lower ) inserts.append((index + 1, operation)) for inserted, (index, operation) in enumerate(inserts): migration.operations.insert(inserted + index, operation)
def create_contenttypes(app_config, verbosity=2, interactive=True, using=DEFAULT_DB_ALIAS, apps=global_apps, **kwargs): """ Creates content types for models in the given app. """ if not app_config.models_module: return app_label = app_config.label try: app_config = apps.get_app_config(app_label) ContentType = apps.get_model('contenttypes', 'ContentType') except LookupError: return content_types, app_models = get_contenttypes_and_models(app_config, using, ContentType) if not app_models: return cts = [ ContentType( app_label=app_label, model=model_name, ) for (model_name, model) in six.iteritems(app_models) if model_name not in content_types ] ContentType.objects.using(using).bulk_create(cts) if verbosity >= 2: for ct in cts: print("Adding content type '%s | %s'" % (ct.app_label, ct.model))
def add_arguments(self, parser): parser.add_argument( '--ignore', action='store', nargs='+', dest='ignore', default=[], help="Comma separated list of apps to ignore missing migration files. " "Useful for specifying third-party ones here.")
def ready(self): # Ensure that signal handlers are loaded from . import handlers # This will load all cms_plugins.py files under # installed apps to identify custom plugin templates app_names = [app.name for app in apps.app_configs.values()] plugin_templates_registry.autodiscover(app_names)
def test_migration_startswith_to_matches(self): state = FlagState.objects.create(name='MY_FLAG', condition='path', value='/my/path') self.migration.forwards(apps, None) state.refresh_from_db() self.assertEqual(state.condition, 'path matches') self.assertEqual(state.value, '^/my/path')
def test_migration_startswith_to_matches_backwards(self): state = FlagState.objects.create(name='MY_FLAG', condition='path matches', value='^/my/path') self.migration.backwards(apps, None) state.refresh_from_db() self.assertEqual(state.condition, 'path') self.assertEqual(state.value, '/my/path')
def handle(self, *args, **kwargs): changed = set() self.stdout.write("Checking...") for db in settings.DATABASES.keys(): try: executor = MigrationExecutor(connections[db]) except OperationalError: self.stdout.write("Unable to check migrations: cannot connect to database '{}'.\n".format(db)) sys.exit(1) all_apps = apps.app_configs.keys() questioner = InteractiveMigrationQuestioner(specified_apps=all_apps, dry_run=True) autodetector = MigrationAutodetector( executor.loader.project_state(), ProjectState.from_apps(apps), questioner, ) changed.update(autodetector.changes(graph=executor.loader.graph, convert_apps=all_apps).keys()) if changed: self.stdout.write( "Apps with model changes but no corresponding migration file: {!r}\n".format( list(changed) ) ) sys.exit(1) else: self.stdout.write("All migration files present.\n") sys.exit(0)
def test_0002_patch_admin_reverse(self): Schema.objects.mass_create('a', 'b', 'c') remove_all_schemata = import_module('boardinghouse.migrations.0002_patch_admin').remove_all_schemata remove_all_schemata(apps, connection.schema_editor()) self.assertEqual(0, Schema.objects.count())
def test_0004_change_sequence_owners(self): Schema.objects.mass_create('a', 'b', 'c') module = import_module('boardinghouse.migrations.0004_change_sequence_owners') module.change_existing_sequence_owners(apps, connection.schema_editor()) # How can I assert that this was executed, and did what it says on the box? module.noop(apps, connection.schema_editor())
def execute_migration(schema_editor, operations, project=None): """Executes the specified migration operations using the specified schema editor. Arguments: schema_editor: The schema editor to use to execute the migrations. operations: The migration operations to execute. project: The project state to use during the migrations. """ project = project or migrations.state.ProjectState.from_apps(apps) class Migration(migrations.Migration): pass Migration.operations = operations executor = MigrationExecutor(schema_editor.connection) executor.apply_migration( project, Migration('eh', 'postgres_extra'))
def alter_db_table(field, filters: List[str]): """Creates a model with the specified field and then renames the database table. Arguments: field: The field to include into the model. filters: List of strings to filter SQL statements on. """ model = define_fake_model() project = migrations.state.ProjectState.from_apps(apps) with connection.schema_editor() as schema_editor: execute_migration(schema_editor, [ migrations.CreateModel( model.__name__, fields=[ ('title', field.clone()) ] ) ], project) with filtered_schema_editor(*filters) as (schema_editor, calls): execute_migration(schema_editor, [ migrations.AlterModelTable( model.__name__, 'NewTableName' ) ], project) yield calls
def add_field(field, filters: List[str]): """Adds the specified field to a model. Arguments: field: The field to add to a model. filters: List of strings to filter SQL statements on. """ model = define_fake_model() project = migrations.state.ProjectState.from_apps(apps) with connection.schema_editor() as schema_editor: execute_migration(schema_editor, [ migrations.CreateModel( model.__name__, fields=[] ) ], project) with filtered_schema_editor(*filters) as (schema_editor, calls): execute_migration(schema_editor, [ migrations.AddField( model.__name__, 'title', field ) ], project) yield calls
def remove_field(field, filters: List[str]): """Removes the specified field from a model. Arguments: field: The field to remove from a model. filters: List of strings to filter SQL statements on. """ model = define_fake_model({'title': field}) project = migrations.state.ProjectState.from_apps(apps) with connection.schema_editor() as schema_editor: execute_migration(schema_editor, [ migrations.CreateModel( model.__name__, fields=[ ('title', field.clone()) ] ) ], project) with filtered_schema_editor(*filters) as (schema_editor, calls): execute_migration(schema_editor, [ migrations.RemoveField( model.__name__, 'title' ) ], project) yield calls
def alter_field(old_field, new_field, filters: List[str]): """Alters a field from one state to the other. Arguments: old_field: The field before altering it. new_field: The field after altering it. filters: List of strings to filter SQL statements on. """ model = define_fake_model({'title': old_field}) project = migrations.state.ProjectState.from_apps(apps) with connection.schema_editor() as schema_editor: execute_migration(schema_editor, [ migrations.CreateModel( model.__name__, fields=[ ('title', old_field.clone()) ] ) ], project) with filtered_schema_editor(*filters) as (schema_editor, calls): execute_migration(schema_editor, [ migrations.AlterField( model.__name__, 'title', new_field ) ], project) yield calls
def create_permissions(app_config, verbosity=2, interactive=True, using=DEFAULT_DB_ALIAS, apps=global_apps, **kwargs): if not app_config.models_module: return app_label = app_config.label try: app_config = apps.get_app_config(app_label) ContentType = apps.get_model('contenttypes', 'ContentType') Permission = apps.get_model('auth', 'Permission') except LookupError: return if not router.allow_migrate_model(using, Permission): return # This will hold the permissions we're looking for as # (content_type, (codename, name)) searched_perms = list() # The codenames and ctypes that should exist. ctypes = set() for klass in app_config.get_models(): # Force looking up the content types in the current database # before creating foreign keys to them. ctype = ContentType.objects.db_manager(using).get_for_model(klass) ctypes.add(ctype) for perm in _get_all_permissions(klass._meta): searched_perms.append((ctype, perm)) # Find all the Permissions that have a content_type for a model we're # looking for. We don't need to check for codenames since we already have # a list of the ones we're going to create. all_perms = set(Permission.objects.using(using).filter( content_type__in=ctypes, ).values_list( "content_type", "codename" )) perms = [ Permission(codename=codename, name=name, content_type=ct) for ct, (codename, name) in searched_perms if (ct.pk, codename) not in all_perms ] Permission.objects.using(using).bulk_create(perms) if verbosity >= 2: for perm in perms: print("Adding permission '%s'" % perm)