我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.models.query.QuerySet()。
def _get_queryset(klass): """ Returns a QuerySet from a Model, Manager, or QuerySet. Created to make get_object_or_404 and get_list_or_404 more DRY. Raises a ValueError if klass is not a Model, Manager, or QuerySet. """ if isinstance(klass, QuerySet): return klass elif isinstance(klass, Manager): manager = klass elif isinstance(klass, ModelBase): manager = klass._default_manager else: if isinstance(klass, type): klass__name = klass.__name__ else: klass__name = klass.__class__.__name__ raise ValueError("Object is of type '%s', but must be a Django Model, " "Manager, or QuerySet" % klass__name) return manager.all()
def get_object_or_404(klass, *args, **kwargs): """ Uses get() to return an object, or raises a Http404 exception if the object does not exist. klass may be a Model, Manager, or QuerySet object. All other passed arguments and keyword arguments are used in the get() query. Note: Like with get(), an MultipleObjectsReturned will be raised if more than one object is found. """ queryset = _get_queryset(klass) try: return queryset.get(*args, **kwargs) except queryset.model.DoesNotExist: raise Http404('No %s matches the given query.' % queryset.model._meta.object_name)
def process_rhs(self, compiler, connection): value = self.rhs if self.bilateral_transforms: if self.rhs_is_direct_value(): # Do not call get_db_prep_lookup here as the value will be # transformed before being used for lookup value = Value(value, output_field=self.lhs.output_field) value = self.apply_bilateral_transforms(value) value = value.resolve_expression(compiler.query) # Due to historical reasons there are a couple of different # ways to produce sql here. get_compiler is likely a Query # instance, _as_sql QuerySet and as_sql just something with # as_sql. Finally the value can of course be just plain # Python value. if hasattr(value, 'get_compiler'): value = value.get_compiler(connection=connection) if hasattr(value, 'as_sql'): sql, params = compiler.compile(value) return '(' + sql + ')', params if hasattr(value, '_as_sql'): sql, params = value._as_sql(connection=connection) return '(' + sql + ')', params else: return self.get_db_prep_lookup(value, connection)
def test_queryset_multiple(self): """ When using multiple paramters to filter they get ANDed together. Ensure this works when filtering by QuerySet. """ qss = self._get_qss().filter(**{'#__gt': 0, 'title__gt': 'Django Rocks'}) data = [it.title for it in qss] expected = [ # Some of the Articles and the BlogPosts. 'Some Article', 'Post', ] self.assertEqual(data, expected) # This would only look at Articles and BlogPosts, but neither of those # have a title > "Some Article." qss = self._get_qss().filter(**{'#__gt': 0, 'title__gt': 'Some Article'}) # Only the articles are here because it's the second queryset. data = [it.title for it in qss] self.assertEqual(data, [])
def get_item_by_key(passed_list, key, value): ''' This one allows us to get one or more items from a list of dictionaries based on the value of a specified key, where both the key and the value can be variable names. Does not work with None or null string passed values. ''' if value in [None,'']: return if type(passed_list) in [QuerySet, PolymorphicQuerySet]: sub_list = passed_list.filter(**{key: value}) else: sub_list = [x for x in passed_list if x.get(key) == value] if len(sub_list) == 1: return sub_list[0] return sub_list
def resolve_connection(cls, connection, default_manager, args, iterable): if iterable is None: iterable = default_manager iterable = maybe_queryset(iterable) if isinstance(iterable, QuerySet): if iterable is not default_manager: default_queryset = maybe_queryset(default_manager) iterable = cls.merge_querysets(default_queryset, iterable) _len = iterable.count() else: _len = len(iterable) connection = connection_from_list_slice( iterable, args, slice_start=0, list_length=_len, list_slice_length=_len, connection_type=connection, edge_type=connection.Edge, pageinfo_type=PageInfo, ) connection.iterable = iterable connection.length = _len return connection
def get_searched_queryset(self, qs): model = self.model term = self.GET["term"] try: term = model.autocomplete_term_adjust(term) except AttributeError: pass search_fields = get_autocomplete_search_fields(self.model) if search_fields: for word in term.split(): search = [models.Q(**{smart_text(item): smart_text(word)}) for item in search_fields] search_qs = QuerySet(model) search_qs.query.select_related = qs.query.select_related search_qs = search_qs.filter(reduce(operator.or_, search)) qs &= search_qs else: qs = model.objects.none() return qs
def add_fail_count_to_tasklist(tasklist): tid_list=[i.tid for i in tasklist] buff={} for i in tasklist: buff[i.tid]=i if len(tid_list)==0: pass return True for i in tasklist: i.count=0 if len(tid_list)==1: tid_list*=2 tid_tuple=tuple(tid_list) # _query= Runlog.objects.filter(tid__in=tid_list).query #_query.group_by=["tid"] #_buff=QuerySet(query=_query,model=Runlog) #_result=_buff.annotate(count=Count("rid")) cursor=connection.cursor() cursor.execute("SELECT `task_runlog`.`rid`, `task_runlog`.`tid`,COUNT(`task_runlog`.`rid`) AS `count` FROM `task_runlog` WHERE `task_runlog`.`tid` IN %s GROUP BY tid;",(tid_tuple,)) _result=cursor.fetchall() for _rid,_tid,_count in _result: buff[_tid].count=int(_count) return True
def content(self, value): workbook = None if not bool(value) or not len(value): # Short-circuit to protect against empty querysets/empty lists/None, etc self._container = [] return elif isinstance(value, list): workbook = self._serialize_list(value) elif isinstance(value, QuerySet): workbook = self._serialize_queryset(value) if django.VERSION < (1, 9): if isinstance(value, ValuesQuerySet): workbook = self._serialize_values_queryset(value) if workbook is None: raise ValueError('ExcelResponse accepts the following data types: list, dict, QuerySet, ValuesQuerySet') if self.force_csv: self['Content-Type'] = 'text/csv; charset=utf8' self['Content-Disposition'] = 'attachment;filename={}.csv'.format(self.output_filename) workbook.seek(0) workbook = self.make_bytes(workbook.getvalue()) else: self['Content-Type'] = 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' self['Content-Disposition'] = 'attachment; filename={}.xlsx'.format(self.output_filename) workbook = save_virtual_workbook(workbook) self._container = [self.make_bytes(workbook)]
def serialize_objects(django_objects,class_models,*args): ''' models ??? :param django_objects: :param args: :param class_models:name of models class :return:json ''' serialize_list = [] if isinstance(django_objects,QuerySet): for django_object in django_objects: data_dic = {} for column in args: data_dic[column]= str(eval('django_object.%s'%column)) serialize_list.append(data_dic) if isinstance(django_objects,class_models): data_dic = {} for column in args: data_dic[column]= str(eval('django_objects.%s'%column)) serialize_list.append(data_dic) return serialize_list
def _get_queryset(klass): """ Returns a QuerySet from a Model, Manager, or QuerySet. Created to make get_object_or_404 and get_list_or_404 more DRY. Raises a ValueError if klass is not a Model, Manager, or QuerySet. """ if isinstance(klass, QuerySet): return klass elif isinstance(klass, Manager): manager = klass elif isinstance(klass, ModelBase): manager = klass._default_manager else: klass__name = klass.__name__ if isinstance(klass, type) \ else klass.__class__.__name__ raise ValueError("Object is of type '%s', but must be a Django Model, " "Manager, or QuerySet" % klass__name) return manager.all()
def get_object_or_none(klass, *args, **kwargs): """ Uses get() to return an object, or raises a Http404 exception if the object does not exist. klass may be a Model, Manager, or QuerySet object. All other passed arguments and keyword arguments are used in the get() query. Note: Like with get(), an MultipleObjectsReturned will be raised if more than one object is found. """ queryset = _get_queryset(klass) try: return queryset.get(*args, **kwargs) except queryset.model.DoesNotExist: return None
def queue(users, label, extra_context=None, on_site=True, sender=None): """ Queue the notification in NoticeQueueBatch. This allows for large amounts of user notifications to be deferred to a seperate process running outside the webserver. """ if extra_context is None: extra_context = {} if isinstance(users, QuerySet): users = [row["pk"] for row in users.values("pk")] else: users = [user.pk for user in users] notices = [] for user in users: notices.append((user, label, extra_context, on_site, sender)) NoticeQueueBatch(pickled_data=pickle.dumps(notices).encode("base64")).save()
def filter_by_ids(self, query, ids=None): """Filter `query` result set by system_id values. :param query: A QuerySet of Nodes. :type query: django.db.models.query.QuerySet_ :param ids: Optional set of ids to filter by. If given, nodes whose system_ids are not in `ids` will be ignored. :type param_ids: Sequence :return: A filtered version of `query`. .. _django.db.models.query.QuerySet: https://docs.djangoproject.com/ en/dev/ref/models/querysets/ """ if ids is None: return query else: return query.filter(system_id__in=ids)
def get_bmc_accessible_nodes(self): """Return `QuerySet` of nodes that this rack controller can access. This looks at the IP address assigned to all BMC's and filters out only the BMC's this rack controller can access. Returning all nodes connected to those BMCs. """ subnet_ids = set() for interface in self.interface_set.all().prefetch_related( "ip_addresses"): for ip_address in interface.ip_addresses.all(): if ip_address.ip and ip_address.subnet_id is not None: subnet_ids.add(ip_address.subnet_id) nodes = Node.objects.filter( bmc__ip_address__ip__isnull=False, bmc__ip_address__subnet_id__in=subnet_ids).distinct() return nodes
def test_creates_handler_with_options(self): handler = make_handler( "TestHandler", abstract=True, allowed_methods=["list"], handler_name="testing", queryset=Node.objects.all(), pk="system_id", fields=["hostname", "distro_series"], exclude=["system_id"], list_fields=["hostname"], list_exclude=["hostname"], non_changeable=["system_id"], form=sentinel.form) self.assertThat(handler._meta, MatchesStructure( abstract=Is(True), allowed_methods=Equals(["list"]), handler_name=Equals("testing"), object_class=Is(Node), queryset=IsInstance(QuerySet), pk=Equals("system_id"), fields=Equals(["hostname", "distro_series"]), exclude=Equals(["system_id"]), list_fields=Equals(["hostname"]), list_exclude=Equals(["hostname"]), non_changeable=Equals(["system_id"]), form=Is(sentinel.form)))
def get_queryset(self): if self.queryset is not None: queryset = self.queryset if isinstance(queryset, QuerySet): queryset = queryset.all() elif self.model is not None: queryset = self.model._default_manager.all() else: raise ImproperlyConfigured( "%(cls)s is missing a QuerySet. Define " "%(cls)s.model, %(cls)s.queryset, or override " "%(cls)s.get_queryset()." % { 'cls': self.__class__.__name__ } ) ordering = self.get_ordering() if ordering: if isinstance(ordering, six.string_types): ordering = (ordering,) queryset = queryset.order_by(*ordering) return queryset
def process_rhs(self, compiler, connection): value = self.rhs if self.bilateral_transforms: if self.rhs_is_direct_value(): # Do not call get_db_prep_lookup here as the value will be # transformed before being used for lookup value = QueryWrapper("%s", [self.lhs.output_field.get_db_prep_value(value, connection)]) value = self.apply_bilateral_transforms(value) # Due to historical reasons there are a couple of different # ways to produce sql here. get_compiler is likely a Query # instance, _as_sql QuerySet and as_sql just something with # as_sql. Finally the value can of course be just plain # Python value. if hasattr(value, 'get_compiler'): value = value.get_compiler(connection=connection) if hasattr(value, 'as_sql'): sql, params = compiler.compile(value) return '(' + sql + ')', params if hasattr(value, '_as_sql'): sql, params = value._as_sql(connection=connection) return '(' + sql + ')', params else: return self.get_db_prep_lookup(value, connection)
def _eval(self): results = self.aggregate(**self._params) try: values = isinstance(results, ValuesQuerySet) except NameError: # django >= 1.9 values = results.__class__ is not ModelIterable if isinstance(results, QuerySet) and not values: self._data_type = 'qs' elif pnd and isinstance(results, DataFrame): self._data_type = 'df' self._split_totals(results) self._evaluated = True
def get_list_or_404(klass, *args, **kwargs): """ Uses filter() to return a list of objects, or raise a Http404 exception if the list is empty. klass may be a Model, Manager, or QuerySet object. All other passed arguments and keyword arguments are used in the filter() query. """ queryset = _get_queryset(klass) obj_list = list(queryset.filter(*args, **kwargs)) if not obj_list: raise Http404('No %s matches the given query.' % queryset.model._meta.object_name) return obj_list
def __init__(self, lhs, rhs): self.lhs, self.rhs = lhs, rhs self.rhs = self.get_prep_lookup() if hasattr(self.lhs, 'get_bilateral_transforms'): bilateral_transforms = self.lhs.get_bilateral_transforms() else: bilateral_transforms = [] if bilateral_transforms: # We should warn the user as soon as possible if he is trying to apply # a bilateral transformation on a nested QuerySet: that won't work. # We need to import QuerySet here so as to avoid circular from django.db.models.query import QuerySet if isinstance(rhs, QuerySet): raise NotImplementedError("Bilateral transformations on nested querysets are not supported.") self.bilateral_transforms = bilateral_transforms
def get_queryset(self): """ Get the list of items for this view. This must be an iterable, and may be a queryset. Defaults to using `self.queryset`. This method should always be used rather than accessing `self.queryset` directly, as `self.queryset` gets evaluated only once, and those results are cached for all subsequent requests. You may want to override this if you need to provide different querysets depending on the incoming request. (Eg. return a list of items that is specific to the user) """ assert self.queryset is not None, ( "'%s' should either include a `queryset` attribute, " "or override the `get_queryset()` method." % self.__class__.__name__ ) queryset = self.queryset if isinstance(queryset, QuerySet): # Ensure queryset is re-evaluated on each request. queryset = queryset.all() return queryset
def get_queryset(self): queryset = self.queryset if isinstance(queryset, (QuerySet, Manager)): # Ensure queryset is re-evaluated whenever used. # Note that actually a `Manager` class may also be used as the # queryset argument. This occurs on ModelSerializer fields, # as it allows us to generate a more expressive 'repr' output # for the field. # Eg: 'MyRelationship(queryset=ExampleModel.objects.all())' queryset = queryset.all() return queryset
def test_model(self): """The model should be an instance of Book.""" # The replaced model should be on both the QuerySet and Query. self.assertIs(self.all.query._querysets[0].model, self.all.query._querysets[0].query.model) # It's still an instance of the original model. first = self.all[0] self.assertIsInstance(first, Book) # But it also has a new superclass. self.assertIn('queryset_sequence.QuerySequenceModel', map(lambda cls: cls.__module__ + '.' + cls.__name__, first.__class__.__mro__)) # Note that a bunch of meta properties get re-labeled. This is OK. options = first._meta self.assertTrue( options.app_label.startswith('queryset_sequence.')) self.assertEquals(options.model_name, 'querysequencemodel') self.assertEquals(options.object_name, 'QuerySequenceModel') # Django >= 1.9 the label attribute exists. Otherwise, cast to a string. object_name = 'QuerySequenceModel' try: label = options.label except AttributeError: label = str(options) object_name = object_name.lower() self.assertTrue(label.startswith('queryset_sequence')) self.assertTrue(label.endswith(object_name))
def test_queryset_number(self): """Ensure that the QuerySet number is correct on the model.""" data = list(map(attrgetter('#'), self.all._clone())) self.assertEqual([0, 0, 1, 1, 1], data)
def test_queryset_number_filter(self): """The QuerySet number shouldn't change after filtering, etc.""" data = list(map(attrgetter('#'), self.all.filter(**{'#': 1}))) self.assertEqual([1, 1, 1], data)
def test_len(self): qss = self.all._clone() # Calling len() evaluates the QuerySet. self.assertEqual(len(qss), 5) self.assertIsNotNone(qss._result_cache) # Count should still work (and not hit the database) by using the cache. qss.query = None self.assertEqual(qss.count(), 5)
def test_iter_cache(self): """Ensure that iterating the QuerySet caches.""" qss = self.all._clone() with self.assertNumQueries(2): data = [it.title for it in qss] self.assertEqual(data, TestIterator.EXPECTED) # So the second call does nothing. with self.assertNumQueries(0): data = [it.title for it in qss] self.assertEqual(data, TestIterator.EXPECTED)
def test_empty(self): """ Ensure that filter() works when it results in an empty QuerySet. """ # Filter to nothing. with self.assertNumQueries(0): qss = self.all.filter(title='') self.assertEqual(qss.count(), 0) self.assertIsInstance(qss, QuerySetSequence) # This should not throw an exception. data = list(qss) self.assertEqual(len(data), 0)
def test_simplify(self): """ Ensure that filter() properly filters the children QuerySets and simplifies to a single child QuerySet when all others become empty. """ # Filter to just Alice's work. with self.assertNumQueries(0): alice_qss = self.all.exclude(author=self.bob) self.assertEqual(alice_qss.count(), 2) # TODO # self.assertIsNone(alice_qss._result_cache) # Since we've now filtered down to a single QuerySet, we shouldn't be a # QuerySetSequence any longer. self.assertIsInstance(alice_qss, QuerySet)
def test_empty(self): """ Ensure that filter() works when it results in an empty QuerySet. """ # Filter to nothing. with self.assertNumQueries(0): qss = self.all.exclude(author__in=[self.alice, self.bob]) self.assertEqual(qss.count(), 0) self.assertIsInstance(qss, QuerySetSequence) # This should not throw an exception. data = list(qss) self.assertEqual(len(data), 0)
def test_order_by_queryset(self): """Ensure we can order by QuerySet and then other fields.""" # Order by title, but don't interleave each QuerySet. with self.assertNumQueries(0): qss = self.all.order_by('#', 'title') self.assertEqual(qss.query.order_by, ['#', 'title']) self.assertEqual( qss.query._querysets[0].query.order_by, ('title',) if DJANGO_VERSION >= (2,) else ['title'], ) # Ensure that _ordered_iterator isn't called. with patch('queryset_sequence.QuerySequence._ordered_iterator', side_effect=AssertionError('_ordered_iterator should not be called')): # Check the titles are properly ordered. data = [it.title for it in qss] expected = [ # First the Books, in order. 'Biography', 'Fiction', # Then the Articles, in order. 'Alice in Django-land', 'Django Rocks', 'Some Article', ] self.assertEqual(data, expected)
def test_single_element(self): """Single element.""" qss = self.all._clone() result = qss[0] self.assertEqual(result.title, 'Fiction') self.assertIsInstance(result, Book) # qss never gets evaluated since the underlying QuerySet is used. self.assertIsNone(qss._result_cache)
def test_one_QuerySet(self): """Test slicing only from one QuerySet.""" qss = self.all._clone() result = qss[0:2] self.assertIsInstance(result, QuerySet) # qss never gets evaluated since the underlying QuerySet is used. self.assertIsNone(qss._result_cache) # Check the data. for element in result: self.assertIsInstance(element, Book)