我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.apps.apps.get_model()。
def _get_content_file(self, app_label, model_name, field_name, fname): try: saved_files = self.saves_by_field[(app_label, model_name, field_name)] except KeyError: raise StatsTestStorageError( "{}.{}.{} has not been written to yet so there is nothing to read.".format( app_label, model_name, field_name)) if fname not in saved_files: raise StatsTestStorageError( "{}.{}.{} has not had a file named '{}' written to it. " "Be careful - the storage engine may have added some random " "characters to the name before attempting to write.".format( app_label, model_name, field_name, fname)) field = apps.get_model(app_label, model_name)._meta.get_field(field_name) return field.storage.open_no_log(fname)
def _from_tables(self): if hasattr(self._view_meta, "from_tables"): return self._view_meta.from_tables from django.apps import apps from_models = self._view_meta.from_models from_tables = set() for label in from_models: if "." in label: app_label, model_name = label.split(".") model = apps.get_model(app_label=app_label, model_name=model_name) if issubclass(model, View): from_tables = from_tables | model._from_tables() else: from_tables.add(model._meta.db_table) else: from_tables.add(label) setattr(self._view_meta, "from_tables", from_tables) return from_tables
def _from_view_models(self): if hasattr(self._view_meta, "from_view_models"): return self._view_meta.from_view_models tables = { model._meta.db_table: model for model in apps.get_models() } from_models = self._view_meta.from_models from_view_models = set() for label in from_models: if "." in label: app_label, model_name = label.split(".") model = apps.get_model(app_label=app_label, model_name=model_name) if issubclass(model, View): from_view_models.add(model) else: model = tables[label] if issubclass(model, View): from_view_models.add(model) setattr(self._view_meta, "from_view_models", from_view_models) return from_view_models
def _get_model_from_node(self, node, attr): """ Helper to look up a model from a <object model=...> or a <field rel=... to=...> node. """ model_identifier = node.getAttribute(attr) if not model_identifier: raise base.DeserializationError( "<%s> node is missing the required '%s' attribute" % (node.nodeName, attr)) try: return apps.get_model(model_identifier) except (LookupError, TypeError): raise base.DeserializationError( "<%s> node has invalid model identifier: '%s'" % (node.nodeName, model_identifier))
def _load_field(app_label, model_name, field_name): return apps.get_model(app_label, model_name)._meta.get_field(field_name) # A guide to Field parameters: # # * name: The name of the field specified in the model. # * attname: The attribute to use on the model object. This is the same as # "name", except in the case of ForeignKeys, where "_id" is # appended. # * db_column: The db_column specified in the model (or None). # * column: The database column for this field. This is the same as # "attname", except if db_column is specified. # # Code that introspects values, or does other dynamic things, should use # attname. For example, this gets the primary key value of object "obj": # # getattr(obj, opts.pk.attname)
def _rename(self, apps, schema_editor, old_model, new_model): ContentType = apps.get_model('contenttypes', 'ContentType') db = schema_editor.connection.alias if not router.allow_migrate_model(db, ContentType): return try: content_type = ContentType.objects.db_manager(db).get_by_natural_key(self.app_label, old_model) except ContentType.DoesNotExist: pass else: content_type.model = new_model try: with transaction.atomic(using=db): content_type.save(update_fields={'model'}) except IntegrityError: # Gracefully fallback if a stale content type causes a # conflict as update_contenttypes will take care of asking the # user what should be done next. content_type.model = old_model else: # Clear the cache as the `get_by_natual_key()` call will cache # the renamed ContentType instance by its old model name. ContentType.objects.clear_cache()
def context(self, context): btns = [] for b in self.q_btns: btn = {} if 'model' in b: model = self.get_model(b['model']) if not self.user.has_perm("%s.view_%s" % (model._meta.app_label, model._meta.model_name)): continue btn['url'] = reverse("%s:%s_%s_%s" % (self.admin_site.app_name, model._meta.app_label, model._meta.model_name, b.get('view', 'changelist'))) btn['title'] = model._meta.verbose_name btn['icon'] = self.dashboard.get_model_icon(model) else: try: btn['url'] = reverse(b['url']) except NoReverseMatch: btn['url'] = b['url'] if 'title' in b: btn['title'] = b['title'] if 'icon' in b: btn['icon'] = b['icon'] btns.append(btn) context.update({'btns': btns})
def parse_apps_and_model_labels(labels): """ Parse a list of "app_label.ModelName" or "app_label" strings into actual objects and return a two-element tuple: (set of model classes, set of app_configs). Raise a CommandError if some specified models or apps don't exist. """ apps = set() models = set() for label in labels: if '.' in label: try: model = installed_apps.get_model(label) except LookupError: raise CommandError('Unknown model: %s' % label) models.add(model) else: try: app_config = installed_apps.get_app_config(label) except LookupError as e: raise CommandError(str(e)) apps.add(app_config) return models, apps
def _get_sitemap_full_url(sitemap_url): if not django_apps.is_installed('django.contrib.sites'): raise ImproperlyConfigured("ping_google requires django.contrib.sites, which isn't installed.") if sitemap_url is None: try: # First, try to get the "index" sitemap URL. sitemap_url = reverse('django.contrib.sitemaps.views.index') except NoReverseMatch: try: # Next, try for the "global" sitemap URL. sitemap_url = reverse('django.contrib.sitemaps.views.sitemap') except NoReverseMatch: pass if sitemap_url is None: raise SitemapNotFound("You didn't provide a sitemap_url, and the sitemap URL couldn't be auto-detected.") Site = django_apps.get_model('sites.Site') current_site = Site.objects.get_current() return 'http://%s%s' % (current_site.domain, sitemap_url)
def get_user_model(): """ Returns the User model that is active in this project. """ try: return django_apps.get_model(settings.AUTH_USER_MODEL, require_ready=False) except ValueError: raise ImproperlyConfigured("AUTH_USER_MODEL must be of the form 'app_label.model_name'") except LookupError: raise ImproperlyConfigured( "AUTH_USER_MODEL refers to model '%s' that has not been installed" % settings.AUTH_USER_MODEL )
def _resolve_model(obj): """ Resolve supplied `obj` to a Django model class. `obj` must be a Django model class itself, or a string representation of one. Useful in situations like GH #1225 where Django may not have resolved a string-based reference to a model in another model's foreign key definition. String representations should have the format: 'appname.ModelName' """ if isinstance(obj, six.string_types) and len(obj.split('.')) == 2: app_name, model_name = obj.split('.') resolved_model = apps.get_model(app_name, model_name) if resolved_model is None: msg = "Django did not return a model for {0}.{1}" raise ImproperlyConfigured(msg.format(app_name, model_name)) return resolved_model elif inspect.isclass(obj) and issubclass(obj, models.Model): return obj raise ValueError("{0} is not a Django model".format(obj))
def email_share(self, user, email, type, id): # Get our e-mail formatter formatter = self.get_email_formatter(type) instance = None # Attempt to get the object instance we want to share try: instance = apps.get_model(self.AVAILABLE_TYPES[type]).objects.get(id=id) except ObjectDoesNotExist: # If it doesn't exist, respond with a 404 return Response({"message": f"Object with id {id} does not exist"}, status=status.HTTP_404_NOT_FOUND) # Create our e-mail body email_body = { "to": email, "subject": f"[TalentMAP] Shared {type}", "body": formatter(instance) } # TODO: Implement actual e-mail sending here when avaiable e-mail servers are clarified # Return a 202 ACCEPTED with a copy of the email body return Response({"message": f"Position shared externally via email at {email}", "email_body": email_body}, status=status.HTTP_202_ACCEPTED)
def internal_share(self, user, email, type, id): receiving_user = None # Attempt to get the object instance we want to share try: apps.get_model(self.AVAILABLE_TYPES[type]).objects.get(id=id) except ObjectDoesNotExist: # If it doesn't exist, respond with a 404 return Response({"message": f"Object with id {id} does not exist"}, status=status.HTTP_404_NOT_FOUND) # Attempt to get the receiving user by e-mail address try: receiving_user = UserProfile.objects.get(user__email=email) except ObjectDoesNotExist: return Response({"message": f"User with email {email} does not exist"}, status=status.HTTP_404_NOT_FOUND) # Create our sharable object using the source user, receiving user, id, and model # This will auto-populate in the receiving user's received shares on their profile Sharable.objects.create(sharing_user=user.profile, receiving_user=receiving_user, sharable_id=id, sharable_model=self.AVAILABLE_TYPES[type]) return Response({"message": f"Position shared internally to user with email {email}"}, status=status.HTTP_202_ACCEPTED)
def test_endpoints_list(authorized_client, authorized_user, endpoint, model, recipe, retrievable): number = random.randint(5, 10) # Create a random amount of objects from the recipe, if it is given if recipe: if callable(recipe): for i in range(0, number): recipe() else: mommy.make_recipe(recipe, _quantity=number) elif model: mommy.make(model, _quantity=number) response = authorized_client.get(endpoint) assert response.status_code == status.HTTP_200_OK assert len(response.data["results"]) == apps.get_model(model).objects.count() assert len(response.data["results"]) == number
def test_endpoints_retrieve(authorized_client, authorized_user, endpoint, model, recipe, retrievable): # Skip any endpoints that don't support "retrieve" actions if not retrievable: return number = random.randint(5, 10) # Create a random amount of objects from the recipe, if it is given if recipe: if callable(recipe): for i in range(0, number): recipe() else: mommy.make_recipe(recipe, _quantity=number) elif model: mommy.make(model, _quantity=number) # Check that each item is retrievable for id in apps.get_model(model).objects.values_list('id', flat=True): response = authorized_client.get(f"{endpoint}{id}/") assert response.status_code == status.HTTP_200_OK
def handle(self, *args, **options): model_name = options.get('model') from_field = options.get('from_field') to_field = options.get('to_field') if not model_name: raise CommandError('--model_name is a required argument') if not from_field: raise CommandError('--from_field is a required argument') if not to_field: raise CommandError('--to_field is a required argument') model = apps.get_model(model_name) content_type = ContentType.objects.get_for_model(model) field_histories = FieldHistory.objects.filter(content_type=content_type, field_name=from_field) self.stdout.write('Updating {} FieldHistory object(s)\n'.format(field_histories.count())) field_histories.update(field_name=to_field)
def handle(self, **options): warnings.warn("The syncdb command will be removed in Django 1.9", RemovedInDjango19Warning) call_command("migrate", **options) try: apps.get_model('auth', 'Permission') except LookupError: return UserModel = get_user_model() if not UserModel._default_manager.exists() and options.get('interactive'): msg = ("\nYou have installed Django's auth system, and " "don't have any superusers defined.\nWould you like to create one " "now? (yes/no): ") confirm = input(msg) while 1: if confirm not in ('yes', 'no'): confirm = input('Please enter either "yes" or "no": ') continue if confirm == 'yes': call_command("createsuperuser", interactive=True, database=options['database']) break
def get_model(model_path): try: from django.apps import apps return apps.get_model(model_path) except ImportError: # Django < 1.7 (cannot use the new app loader) from django.db.models import get_model as django_get_model try: app_label, model_name = model_path.split('.') except ValueError: raise ImproperlyConfigured("SAML_USER_MODEL must be of the form " "'app_label.model_name'") user_model = django_get_model(app_label, model_name) if user_model is None: raise ImproperlyConfigured("SAML_USER_MODEL refers to model '%s' " "that has not been installed" % model_path) return user_model
def get_media_model(): from django.conf import settings from django.apps import apps try: app_label, model_name = settings.WAGTAILMEDIA_MEDIA_MODEL.split('.') except AttributeError: return Media except ValueError: raise ImproperlyConfigured("WAGTAILMEDIA_MEDIA_MODEL must be of the form 'app_label.model_name'") media_model = apps.get_model(app_label, model_name) if media_model is None: raise ImproperlyConfigured( "WAGTAILMEDIA_MEDIA_MODEL refers to model '%s' that has not been installed" % settings.WAGTAILMEDIA_MEDIA_MODEL ) return media_model # Receive the pre_delete signal and delete the file associated with the model instance.
def _resolve_model(obj): """ Resolve supplied `obj` to a Django model class. `obj` must be a Django model class itself, or a string representation of one. Useful in situations like GH #1225 where Django may not have resolved a string-based reference to a model in another model's foreign key definition. String representations should have the format: 'appname.ModelName' """ if isinstance(obj, six.string_types) and len(obj.split('.')) == 2: app_name, model_name = obj.split('.') return apps.get_model(app_name, model_name) elif inspect.isclass(obj) and issubclass(obj, models.Model): return obj else: raise ValueError("{0} is not a Django model".format(obj))
def instance_from_str(instance_str): """ Given an instance string in the form "app.Model:pk", returns a tuple of ``(model, instance)``. If the pk part is empty, ``instance`` will be ``None``. Raises ``ValueError`` on invalid model strings or missing instances. """ match = instance_str_re.match(instance_str) if not match: raise ValueError("Invalid instance string") model_string = match.group(1) try: model = apps.get_model(model_string) except (LookupError, ValueError): raise ValueError("Invalid instance string") pk = match.group(2) if pk: try: return model, model._default_manager.get(pk=pk) except model.DoesNotExist: raise ValueError("Invalid instance string") return model, None
def saml_register(saml_id, user_attributes, token=None): auth_data_model = apps.get_model('users', 'AuthData') user_model = apps.get_model('users', 'User') try: # SAML user association exist? auth_data = auth_data_model.objects.get(key="saml", value=saml_id) user = auth_data.user except auth_data_model.DoesNotExist: try: # Is a there a user with the same email as the SAML user? user = user_model.objects.get(email=user_attributes['email']) auth_data_model.objects.create(user=user, key='saml', value=saml_id, extra={}) except user_model.DoesNotExist: # Create a new user user_attributes['username'] = slugify_uniquely(user_attributes['username'], user_model, slugfield='username') user = user_model.objects.create(**user_attributes) auth_data_model.objects.create(user=user, key='saml', value=saml_id, extra={}) send_register_email(user) user_registered_signal.send(sender=user.__class__, user=user) if token: membership = get_membership_by_token(token) try: membership.user = user membership.save(update_fields=['user']) except IntegrityError: raise exc.IntegrityError(_("This user is already a member of the project.")) return user
def generate_thumbnails(model_name, pk, field): from .utils import load_aliases load_aliases() model = apps.get_model(model_name) instance = model.objects.get(pk=pk) fieldfile = getattr(instance, field) generate_all_aliases(fieldfile, include_global=True)
def generate_thumbnail(model_name, pk, field, opts): model = apps.get_model(model_name) instance = model.objects.get(pk=pk) fieldfile = getattr(instance, field) thumbnail = get_thumbnailer(fieldfile).get_thumbnail(opts) return thumbnail.url
def commit(self, consumer, message): KafkaOffset = apps.get_model(app_label='logpipe', model_name='KafkaOffset') logger.debug('Commit offset "%s" for topic "%s", partition "%s" to %s' % ( message.offset, message.topic, message.partition, self.__class__.__name__)) obj, created = KafkaOffset.objects.get_or_create( topic=message.topic, partition=message.partition) obj.offset = message.offset + 1 obj.save()
def seek(self, consumer, topic, partition): KafkaOffset = apps.get_model(app_label='logpipe', model_name='KafkaOffset') tp = kafka.TopicPartition(topic=topic, partition=partition) try: obj = KafkaOffset.objects.get(topic=topic, partition=partition) logger.debug('Seeking to offset "%s" on topic "%s", partition "%s"' % (obj.offset, topic, partition)) consumer.client.seek(tp, obj.offset) except KafkaOffset.DoesNotExist: logger.debug('Seeking to beginning of topic "%s", partition "%s"' % (topic, partition)) consumer.client.seek_to_beginning(tp)
def commit(self, consumer, message): KinesisOffset = apps.get_model(app_label='logpipe', model_name='KinesisOffset') logger.debug('Commit offset "%s" for stream "%s", shard "%s" to %s' % ( message.offset, message.topic, message.partition, self.__class__.__name__)) obj, created = KinesisOffset.objects.get_or_create( stream=message.topic, shard=message.partition) obj.sequence_number = message.offset obj.save()
def seek(self, consumer, stream, shard): KinesisOffset = apps.get_model(app_label='logpipe', model_name='KinesisOffset') try: obj = KinesisOffset.objects.get(stream=stream, shard=shard) logger.debug('Seeking to offset "%s" on stream "%s", partition "%s"' % (obj.sequence_number, stream, shard)) consumer.seek_to_sequence_number(shard, obj.sequence_number) except KinesisOffset.DoesNotExist: logger.debug('Seeking to beginning of stream "%s", partition "%s"' % (stream, shard)) consumer.seek_to_sequence_number(shard, None)
def run(self): Build = apps.get_model('build', 'Build') build = Build( repo = self.branch.repo, plan = self.plan, branch = self.branch, commit = self.branch.github_api.commit.sha, schedule = self, build_type = 'scheduled', ) build.save() return build