我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.models.Manager()。
def prepare_field(self, obj, field): if isinstance(field, SearchField): yield (unidecode(self.prepare_value(field.get_value(obj))), get_weight(field.boost)) elif isinstance(field, RelatedFields): sub_obj = getattr(obj, field.field_name) if sub_obj is None: return if callable(sub_obj): sub_obj = sub_obj() if isinstance(sub_obj, Manager): sub_objs = sub_obj.all() else: sub_objs = [sub_obj] for sub_obj in sub_objs: for sub_field in field.fields: for value in self.prepare_field(sub_obj, sub_field): yield value
def search(self, terms): """ Produces an icontains lookup on the question title. """ query = None # Query to search for every search term # Build the query with Q and icontains for term in normalize_query(terms): q = models.Q(text__icontains=term) | models.Q(details__icontains=term) query = q if query is None else query & q return self.filter(query) ########################################################################## ## Questions Manager ##########################################################################
def deconstruct(self): kwargs = { 'name': self.name, 'fields': self.fields, } if self.options: kwargs['options'] = self.options if self.bases and self.bases != (models.Model,): kwargs['bases'] = self.bases if self.managers and self.managers != [('objects', models.Manager())]: kwargs['managers'] = self.managers return ( self.__class__.__name__, [], kwargs )
def smart_repr(value): if isinstance(value, models.Manager): return manager_repr(value) if isinstance(value, Promise) and value._delegate_text: value = force_text(value) value = unicode_repr(value) # Representations like u'help text' # should simply be presented as 'help text' if value.startswith("u'") and value.endswith("'"): return value[1:] # Representations like # <django.core.validators.RegexValidator object at 0x1047af050> # Should be presented as # <django.core.validators.RegexValidator object> value = re.sub(' at 0x[0-9A-Fa-f]{4,32}>', '>', value) return value
def to_representation(self, data): if not data: return data if 'request' not in self.context: progress_dict = {} else: kwargs = self.context['view'].kwargs progress_dict, last_active_dict = get_progress_and_last_active(data, **kwargs) # Dealing with nested relationships, data can be a Manager, # so, first get a queryset from the Manager if needed iterable = data.all() if isinstance(data, Manager) else data return [ self.child.to_representation( item, progress=progress_dict.get(item.content_id), last_active=last_active_dict.get(item.content_id)) for item in iterable ]
def to_representation(self, data): if not data: return data if 'request' not in self.context or not self.context['request'].user.is_facility_user: progress_dict = {} else: user = self.context["request"].user # Don't annotate topic progress as too expensive progress_dict = get_topic_and_content_progress_fractions(data, user) # Dealing with nested relationships, data can be a Manager, # so, first get a queryset from the Manager if needed iterable = data.all() if isinstance(data, Manager) else data return [ self.child.to_representation( item, progress_fraction=progress_dict.get(item.content_id, 0.0), annotate_progress_fraction=False ) for item in iterable ]
def test_base_manager(self): def show_base_manager(model): return "{0} {1}".format( repr(type(model._base_manager)), repr(model._base_manager.model) ) self.assertEqual(show_base_manager(PlainA), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.PlainA'>") self.assertEqual(show_base_manager(PlainB), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.PlainB'>") self.assertEqual(show_base_manager(PlainC), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.PlainC'>") self.assertEqual(show_base_manager(Model2A), "<class 'polymorphic.managers.PolymorphicManager'> <class 'polymorphic.tests.Model2A'>") self.assertEqual(show_base_manager(Model2B), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.Model2B'>") self.assertEqual(show_base_manager(Model2C), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.Model2C'>") self.assertEqual(show_base_manager(One2OneRelatingModel), "<class 'polymorphic.managers.PolymorphicManager'> <class 'polymorphic.tests.One2OneRelatingModel'>") self.assertEqual(show_base_manager(One2OneRelatingModelDerived), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.One2OneRelatingModelDerived'>")
def test_instance_default_manager(self): def show_default_manager(instance): return "{0} {1}".format( repr(type(instance._default_manager)), repr(instance._default_manager.model) ) plain_a = PlainA(field1='C1') plain_b = PlainB(field2='C1') plain_c = PlainC(field3='C1') model_2a = Model2A(field1='C1') model_2b = Model2B(field2='C1') model_2c = Model2C(field3='C1') self.assertEqual(show_default_manager(plain_a), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.PlainA'>") self.assertEqual(show_default_manager(plain_b), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.PlainB'>") self.assertEqual(show_default_manager(plain_c), "<class 'django.db.models.manager.Manager'> <class 'polymorphic.tests.PlainC'>") self.assertEqual(show_default_manager(model_2a), "<class 'polymorphic.managers.PolymorphicManager'> <class 'polymorphic.tests.Model2A'>") self.assertEqual(show_default_manager(model_2b), "<class 'polymorphic.managers.PolymorphicManager'> <class 'polymorphic.tests.Model2B'>") self.assertEqual(show_default_manager(model_2c), "<class 'polymorphic.managers.PolymorphicManager'> <class 'polymorphic.tests.Model2C'>")
def test_queryset_assignment(self): # This is just a consistency check for now, testing standard Django behavior. parent = PlainParentModelWithManager.objects.create() child = PlainChildModelWithManager.objects.create(fk=parent) self.assertIs(type(PlainParentModelWithManager._default_manager), models.Manager) self.assertIs(type(PlainChildModelWithManager._default_manager), PlainMyManager) self.assertIs(type(PlainChildModelWithManager.objects), PlainMyManager) self.assertIs(type(PlainChildModelWithManager.objects.all()), PlainMyManagerQuerySet) # A related set is created using the model's _default_manager, so does gain extra methods. self.assertIs(type(parent.childmodel_set.my_queryset_foo()), PlainMyManagerQuerySet) # For polymorphic models, the same should happen. parent = ParentModelWithManager.objects.create() child = ChildModelWithManager.objects.create(fk=parent) self.assertIs(type(ParentModelWithManager._default_manager), PolymorphicManager) self.assertIs(type(ChildModelWithManager._default_manager), MyManager) self.assertIs(type(ChildModelWithManager.objects), MyManager) self.assertIs(type(ChildModelWithManager.objects.my_queryset_foo()), MyManagerQuerySet) # A related set is created using the model's _default_manager, so does gain extra methods. self.assertIs(type(parent.childmodel_set.my_queryset_foo()), MyManagerQuerySet)
def __getattr__(self, name): value = getattr(self._obj, name) if isinstance(value, Manager): value = self._wrap_manager(value) elif isinstance(value, type) and issubclass(value, models.Model) and value._meta.app_label == 'mapdata': value = self._wrap_model(value) elif isinstance(value, models.Model) and value._meta.app_label == 'mapdata': value = self._wrap_instance(value) elif isinstance(value, type) and issubclass(value, Exception): pass elif callable(value) and name not in self._allowed_callables: if name in self._wrapped_callables: func = getattr(self._obj.__class__, name) @wraps(func) def wrapper(*args, **kwargs): return func(self, *args, **kwargs) return wrapper if isinstance(self, (ModelInstanceWrapper, ModelWrapper)) and not hasattr(models.Model, name): return value raise TypeError('Can not call %s.%s wrapped!' % (type(self), name)) return value
def count_votes(self): """ Returns questions annotated with the number of votes they have. """ return self.annotate(num_votes=Coalesce(models.Sum('votes__vote'), 0)) ########################################################################## ## Answer Manager ##########################################################################
def lexical_ordering(self): """ Returns a case-insensitive lexical ordering of the tags. """ return self.order_by(Lower('text')) ########################################################################## ## Tag Manager ##########################################################################
def to_csv(self): """ QuerySet methods must be mirrored on the Manager. """ return self.get_queryset().to_csv()
def test_translate_model_with_custom_manager(self): ''' Verify the MultiLingualManager gets mixed in properly ''' class CustomQuerySet(models.query.QuerySet): pass class CustomManager(models.Manager): def get_queryset(self): return CustomQuerySet() def custom_method(self): return 'foo' class TestModel1(models.Model): name = models.CharField(max_length=100) i18n = TranslationField(fields=('name', )) objects = CustomManager() class Meta: app_label = 'django-modeltrans_tests' translate_model(TestModel1) self.assertIsInstance(TestModel1.objects, CustomManager) self.assertIsInstance(TestModel1.objects, MultilingualManager) self.assertEquals(TestModel1.objects.custom_method(), 'foo') self.assertIsInstance(TestModel1.objects.all(), MultilingualQuerySet)
def has_custom_queryset(manager): ''' Check whether manager (or its parents) has declared some custom get_queryset method. ''' return getattr(manager, 'get_queryset', None) != getattr(Manager, 'get_queryset', None)
def get_document(self, obj): # Build document doc = dict(pk=str(obj.pk), content_type=self.model.indexed_get_content_type()) partials = [] for field in self.model.get_search_fields(): value = field.get_value(obj) if isinstance(field, RelatedFields): if isinstance(value, models.Manager): nested_docs = [] for nested_obj in value.all(): nested_doc, extra_partials = self._get_nested_document(field.fields, nested_obj) nested_docs.append(nested_doc) partials.extend(extra_partials) value = nested_docs elif isinstance(value, models.Model): value, extra_partials = self._get_nested_document(field.fields, value) partials.extend(extra_partials) doc[self.get_field_column_name(field)] = value # Check if this field should be added into _partials if isinstance(field, SearchField) and field.partial_match: partials.append(value) # Add partials to document doc['_partials'] = partials return doc
def get_queryset(self): """ Returns a custom :class:`QuerySet` which provides the CTE functionality for all queries concerning :class:`CTENode` objects. This method overrides the default :meth:`get_queryset` method of the :class:`Manager` class. :returns: a custom :class:`QuerySet` which provides the CTE functionality for all queries concerning :class:`CTENode` objects. """ # The CTEQuerySet uses _cte_node_* attributes from the Model, so ensure # they exist. self._ensure_parameters() return CTEQuerySet(self.model, using=self._db)
def get_queryset(self): queryset = self.queryset if isinstance(queryset, (QuerySet, Manager)): # Ensure queryset is re-evaluated whenever used. # Note that actually a `Manager` class may also be used as the # queryset argument. This occurs on ModelSerializer fields, # as it allows us to generate a more expressive 'repr' output # for the field. # Eg: 'MyRelationship(queryset=ExampleModel.objects.all())' queryset = queryset.all() return queryset
def to_representation(self, data): """ List of object instances -> List of dicts of primitive datatypes. """ # Dealing with nested relationships, data can be a Manager, # so, first get a queryset from the Manager if needed iterable = data.all() if isinstance(data, models.Manager) else data return [ self.child.to_representation(item) for item in iterable ]
def diff_field(self, name, old, new, **kwargs): old_val = getattr(old, name, None) new_val = getattr(new, name, None) if old_val == new_val: return None try: field = self.model._meta.get_field(name) except models.FieldDoesNotExist: field = None if isinstance(field, models.ForeignKey): return diff_model_instances(old_val, new_val, model=field.rel.to, **kwargs) elif isinstance(new_val, (Manager, QuerySet)) or isinstance(old_val, (Manager, QuerySet)): old_val = list(old_val.all()) if old else [] new_val = list(new_val.all()) if new else [] if not old_val and not new_val: return None return ListDiffNode(old_val, new_val, **kwargs) elif field is not None and field.choices: old_val = str(dict(field.choices)[old_val]) if old_val else _('No Information') new_val = str(dict(field.choices)[new_val]) if new_val else _('No Information') return AtomicDiffNode(old_val, new_val, **kwargs) elif isinstance(field, (models.CharField, models.TextField)) and old_val and new_val: return TextDiffNode(old_val, new_val, **kwargs) else: return AtomicDiffNode(_render_value(old_val), _render_value(new_val), **kwargs)
def copy_pastable_to_string(model, excluded: List[str] = None): model_name = model.__class__.__name__ field_names = [f.name for f in model._meta.get_fields() if f.name not in (excluded or [])] values = [(f, getattr(model, f)) for f in field_names] simple_values = [(k, v) for (k, v) in values if not isinstance(v, Manager)] simple_initializers = ["{}={}".format(k, repr(v)) for (k, v) in simple_values if v is not None] manager_values = [(k, list(v.all())) for (k, v) in values if isinstance(v, Manager)] model_adders = ["{}.{}.add({})".format(model_name, k, ",".join([str(e) for e in v])) for (k, v) in manager_values if v] return "{}.objects.create(\n{}\n)\n{}".format(model_name, ",\n".join(simple_initializers), "\n".join(model_adders))
def _prepare(cls): """ Creates some methods once self._meta has been populated. """ opts = cls._meta opts._prepare(cls) if opts.order_with_respect_to: cls.get_next_in_order = curry(cls._get_next_or_previous_in_order, is_next=True) cls.get_previous_in_order = curry(cls._get_next_or_previous_in_order, is_next=False) # Defer creating accessors on the foreign class until it has been # created and registered. If remote_field is None, we're ordering # with respect to a GenericForeignKey and don't know what the # foreign class is - we'll add those accessors later in # contribute_to_class(). if opts.order_with_respect_to.remote_field: wrt = opts.order_with_respect_to remote = wrt.remote_field.model lazy_related_operation(make_foreign_order_accessors, cls, remote) # Give the class a docstring -- its definition. if cls.__doc__ is None: cls.__doc__ = "%s(%s)" % (cls.__name__, ", ".join(f.name for f in opts.fields)) get_absolute_url_override = settings.ABSOLUTE_URL_OVERRIDES.get(opts.label_lower) if get_absolute_url_override: setattr(cls, 'get_absolute_url', get_absolute_url_override) if not opts.managers or cls._requires_legacy_default_manager(): if any(f.name == 'objects' for f in opts.fields): raise ValueError( "Model %s must specify a custom Manager, because it has a " "field named 'objects'." % cls.__name__ ) manager = Manager() manager.auto_created = True cls.add_to_class('objects', manager) signals.class_prepared.send(sender=cls)
def test_user_defined_manager(self): self.create_model2abcd() ModelWithMyManager.objects.create(field1='D1a', field4='D4a') ModelWithMyManager.objects.create(field1='D1b', field4='D4b') objects = ModelWithMyManager.objects.all() # MyManager should reverse the sorting of field1 self.assertEqual(repr(objects[0]), '<ModelWithMyManager: id 6, field1 (CharField) "D1b", field4 (CharField) "D4b">') self.assertEqual(repr(objects[1]), '<ModelWithMyManager: id 5, field1 (CharField) "D1a", field4 (CharField) "D4a">') self.assertEqual(len(objects), 2) self.assertIs(type(ModelWithMyManager.objects), MyManager) self.assertIs(type(ModelWithMyManager._default_manager), MyManager) self.assertIs(type(ModelWithMyManager.base_objects), models.Manager)
def test_user_defined_manager_as_secondary(self): self.create_model2abcd() ModelWithMyManagerNoDefault.objects.create(field1='D1a', field4='D4a') ModelWithMyManagerNoDefault.objects.create(field1='D1b', field4='D4b') objects = ModelWithMyManagerNoDefault.my_objects.all() # MyManager should reverse the sorting of field1 self.assertEqual(repr(objects[0]), '<ModelWithMyManagerNoDefault: id 6, field1 (CharField) "D1b", field4 (CharField) "D4b">') self.assertEqual(repr(objects[1]), '<ModelWithMyManagerNoDefault: id 5, field1 (CharField) "D1a", field4 (CharField) "D4a">') self.assertEqual(len(objects), 2) self.assertIs(type(ModelWithMyManagerNoDefault.my_objects), MyManager) self.assertIs(type(ModelWithMyManagerNoDefault.objects), PolymorphicManager) self.assertIs(type(ModelWithMyManagerNoDefault._default_manager), PolymorphicManager) self.assertIs(type(ModelWithMyManagerNoDefault.base_objects), models.Manager)
def test_user_objects_manager_as_secondary(self): self.create_model2abcd() ModelWithMyManagerDefault.objects.create(field1='D1a', field4='D4a') ModelWithMyManagerDefault.objects.create(field1='D1b', field4='D4b') self.assertIs(type(ModelWithMyManagerDefault.my_objects), MyManager) self.assertIs(type(ModelWithMyManagerDefault.objects), PolymorphicManager) self.assertIs(type(ModelWithMyManagerDefault._default_manager), MyManager) self.assertIs(type(ModelWithMyManagerDefault.base_objects), models.Manager)
def _wrap_manager(self, manager): """ Wrap a manager with same changeset as this wrapper. Detects RelatedManager or ManyRelatedmanager instances and chooses the Wrapper accordingly. """ assert isinstance(manager, Manager) if hasattr(manager, 'through'): return ManyRelatedManagerWrapper(self._changeset, manager) if hasattr(manager, 'instance'): return RelatedManagerWrapper(self._changeset, manager) return ManagerWrapper(self._changeset, manager)
def get_queryset(func): """ Wraps methods of BaseQueryWrapper that manipulate a queryset. If self is a Manager, not an object, preceed the method call with a filter call according to the manager. """ @wraps(func) def wrapper(self, *args, **kwargs): if hasattr(self, 'get_queryset'): return getattr(self.get_queryset(), func.__name__)(*args, **kwargs) return func(self, *args, **kwargs) return wrapper
def _prepare(cls): """ Creates some methods once self._meta has been populated. """ opts = cls._meta opts._prepare(cls) if opts.order_with_respect_to: cls.get_next_in_order = curry(cls._get_next_or_previous_in_order, is_next=True) cls.get_previous_in_order = curry(cls._get_next_or_previous_in_order, is_next=False) # Defer creating accessors on the foreign class until it has been # created and registered. If remote_field is None, we're ordering # with respect to a GenericForeignKey and don't know what the # foreign class is - we'll add those accessors later in # contribute_to_class(). if opts.order_with_respect_to.remote_field: wrt = opts.order_with_respect_to remote = wrt.remote_field.model lazy_related_operation(make_foreign_order_accessors, cls, remote) # Give the class a docstring -- its definition. if cls.__doc__ is None: cls.__doc__ = "%s(%s)" % (cls.__name__, ", ".join(f.name for f in opts.fields)) get_absolute_url_override = settings.ABSOLUTE_URL_OVERRIDES.get(opts.label_lower) if get_absolute_url_override: setattr(cls, 'get_absolute_url', get_absolute_url_override) if not opts.managers or cls._requires_legacy_default_manager(): if any(f.name == 'objects' for f in opts.fields): raise ValueError( "Model %s must specify a custom Manager, because it has a " "field named 'objects'." % cls.__name__ ) manager = Manager() manager.auto_created = True cls.add_to_class('objects', manager) class_prepared.send(sender=cls)
def to_representation(self, data): iterable = data.all() if isinstance(data, models.Manager) else data resp = [] for item in iterable: instance, plugin = item.get_plugin_instance() serializer = get_serializer(instance, plugin=plugin) resp.append(serializer.data) return resp # TODO: Check image plugin data serializer # TODO: decide if we need to return url for images with domain name or not, cdn?
def democratic(self): """ Filters the annotations for only democratic annotations. """ return self.filter(label__slug='democratic') ########################################################################## ## Corpus Manager ##########################################################################
def get_followed_relations(self, obj): """Returns an iterable of related models that should be included in the revision data.""" for relationship in self.follow: # Clear foreign key cache. try: related_field = obj._meta.get_field(relationship) except models.FieldDoesNotExist: pass else: if isinstance(related_field, models.ForeignKey): if hasattr(obj, related_field.get_cache_name()): delattr(obj, related_field.get_cache_name()) # Get the referenced obj(s). try: related = getattr(obj, relationship) except ObjectDoesNotExist: continue if isinstance(related, models.Model): yield related elif isinstance(related, (models.Manager, QuerySet)): for related_obj in related.all(): yield related_obj elif related is not None: raise TypeError("Cannot follow the relationship {relationship}. Expected a model or QuerySet, found {related}".format( relationship = relationship, related = related, ))
def get_query_set(self): """Redefine default query set to exclude archived tasks.""" return models.Manager.get_query_set(self).filter(archive=False)
def coerce_result(self, values): if values is None: return None if isinstance(values, Manager): values = values.all() elif isinstance(values, six.string_types): values = [values] if issubclass(self.type_, Scalar): return [self.type_.coerce_result(value) for value in list(values)] return list(values)
def get_value(self): values = super(RelatedField, self).get_value() if values is None: return None if isinstance(values, Manager): values = values.all() return [ self._serialize_value(value) for value in values ]
def add_manager(model): ''' Monkey patches the original model to use MultilingualManager instead of default managers (not only ``objects``, but also every manager defined and inherited). Custom managers are merged with MultilingualManager. ''' if model._meta.abstract: return def patch_manager_class(manager): if isinstance(manager, MultilingualManager): return if manager.__class__ is Manager: manager.__class__ = MultilingualManager else: class NewMultilingualManager(MultilingualManager, manager.__class__): use_for_related_fields = getattr( manager.__class__, 'use_for_related_fields', not has_custom_queryset(manager)) _old_module = manager.__module__ _old_class = manager.__class__.__name__ def deconstruct(self): return ( False, # as_manager '%s.%s' % (self._old_module, self._old_class), # manager_class None, # qs_class self._constructor_args[0], # args self._constructor_args[1], # kwargs ) manager.__class__ = NewMultilingualManager managers = model._meta.local_managers for current_manager in managers: prev_class = current_manager.__class__ patch_manager_class(current_manager) if model._default_manager.__class__ is prev_class: # Normally model._default_manager is a reference to one of model's managers # (and would be patched by the way). # However, in some rare situations (mostly proxy models) # model._default_manager is not the same instance as one of managers, but it # share the same class. model._default_manager.__class__ = current_manager.__class__ patch_manager_class(model._base_manager) if hasattr(model._meta, '_expire_cache'): model._meta._expire_cache()
def to_representation(self, data): # Dealing with nested relationships, data can be a Manager, # so, first get a queryset from the Manager if needed data = data.all() if isinstance(data, Manager) else data # initialize cache key cache_key = None # ensure that we are filtering by the parent only # this allows us to only cache results on the learn page from .api import ContentNodeFilter pure_parent_query = "parent" in self.context['request'].GET and \ not any(field in self.context['request'].GET for field in ContentNodeFilter.Meta.fields if field != "parent") # Cache parent look ups only if pure_parent_query: cache_key = 'contentnode_list_{parent}'.format( parent=self.context['request'].GET.get('parent')) if cache.get(cache_key): return cache.get(cache_key) if not data: return data if 'request' not in self.context or not self.context['request'].user.is_facility_user: progress_dict = {} else: user = self.context["request"].user # Don't annotate topic progress as too expensive progress_dict = get_content_progress_fractions(data, user) result = [] topic_only = True # Allow results to be limited after all queryset filtering has occurred if self.limit: data = data[:self.limit] for item in data: obj = self.child.to_representation( item, progress_fraction=progress_dict.get(item.content_id), annotate_progress_fraction=False ) topic_only = topic_only and obj.get('kind') == content_kinds.TOPIC result.append(obj) # Only store if all nodes are topics, because we don't annotate progress on them # This has the happy side effect of not caching our dynamically calculated # recommendation queries, which might change for the same user over time # because they do not return topics if topic_only and pure_parent_query: cache.set(cache_key, result, 60 * 10) return result
def base_manager(self): base_manager_name = self.base_manager_name if not base_manager_name: # Get the first parent's base_manager_name if there's one. for parent in self.model.mro()[1:]: if hasattr(parent, '_meta'): if parent._base_manager.name != '_base_manager': base_manager_name = parent._base_manager.name break if base_manager_name: try: return self.managers_map[base_manager_name] except KeyError: raise ValueError( "%s has no manager named %r" % ( self.object_name, base_manager_name, ) ) # Deprecation shim for `use_for_related_fields`. for i, base_manager_class in enumerate(self.default_manager.__class__.mro()): if getattr(base_manager_class, 'use_for_related_fields', False): if not getattr(base_manager_class, 'silence_use_for_related_fields_deprecation', False): warnings.warn( "use_for_related_fields is deprecated, instead " "set Meta.base_manager_name on '{}'.".format(self.model._meta.label), RemovedInDjango20Warning, 2 ) if i == 0: manager = self.default_manager else: manager = base_manager_class() manager.name = '_base_manager' manager.model = self.model return manager manager = Manager() manager.name = '_base_manager' manager.model = self.model manager.auto_created = True return manager
def get_inherited_managers(self, attrs): """ Return list of all managers to be inherited/propagated from the base classes; use correct mro, only use managers with _inherited==False (they are of no use), skip managers that are overwritten by the user with same-named class attributes (in attrs) """ # print "** ", self.__name__ add_managers = [] add_managers_keys = set() for base in self.__mro__[1:]: if not issubclass(base, models.Model): continue if not getattr(base, 'polymorphic_model_marker', None): continue # leave managers of non-polym. models alone for key, manager in base.__dict__.items(): if type(manager) == models.manager.ManagerDescriptor: manager = manager.manager if AbstractManagerDescriptor is not None: # Django 1.4 unconditionally assigned managers to a model. As of Django 1.5 however, # the abstract models don't get any managers, only a AbstractManagerDescriptor as substitute. # Pretend that the manager is still there, so all code works like it used to. if type(manager) == AbstractManagerDescriptor and base.__name__ == 'PolymorphicModel': model = manager.model if key == 'objects': manager = PolymorphicManager() manager.model = model elif key == 'base_objects': manager = models.Manager() manager.model = model if not isinstance(manager, models.Manager): continue if key == '_base_manager': continue # let Django handle _base_manager if key in attrs: continue if key in add_managers_keys: continue # manager with that name already added, skip if manager._inherited: continue # inherited managers (on the bases) have no significance, they are just copies # print '## {0} {1}'.format(self.__name__, key) if isinstance(manager, PolymorphicManager): # validate any inherited polymorphic managers self.validate_model_manager(manager, self.__name__, key) add_managers.append((base.__name__, key, manager)) add_managers_keys.add(key) # The ordering in the base.__dict__ may randomly change depending on which method is added. # Make sure base_objects is on top, and 'objects' and '_default_manager' follow afterwards. # This makes sure that the _base_manager is also assigned properly. add_managers = sorted(add_managers, key=lambda item: (item[1].startswith('_'), item[1])) return add_managers
def test_run_python(self): """ Because this can run arbitrary python code, we can't know which parts of it need to run against each schema, and which parts run against the public schema. We could hack into any generated SQL, and inspect it, looking for table names, attempting to push data to the correct schemata (including executing the SQL multiple times if necessary). Maybe we could fuck with the models generated by project_state.render(), and make their generated SQL do what we need it to do. Although, it looks like Pony.objects is a normal models.Manager class. """ project_state = self.set_up_test_model() def forwards(models, schema_editor): Pony = models.get_model('tests', 'Pony') Pony.objects.create(pink=1, weight=3.55) Pony.objects.create(weight=5) def backwards(models, schema_editor): Pony = models.get_model('tests', 'Pony') Pony.objects.filter(pink=1, weight=3.55).delete() Pony.objects.filter(weight=5).delete() operation = migrations.RunPython(forwards, reverse_code=backwards) new_state = project_state.clone() operation.state_forwards('tests', new_state) Pony = project_state.apps.get_model('tests', 'Pony') @all_schemata def pony_count(count, **kwargs): found = Pony.objects.count() self.assertEqual( count, found, 'Incorrect number of Ponies found in schema ' '{schema}: expected {0}, found {1}'.format(count, found, **kwargs) ) pony_count(0) with connection.schema_editor() as editor: operation.database_forwards('tests', editor, project_state, new_state) pony_count(2) with connection.schema_editor() as editor: operation.database_backwards('tests', editor, new_state, project_state) pony_count(0)