我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.models.QuerySet()。
def get_for_objects(self, queryset): """ Get log entries for the objects in the specified queryset. :param queryset: The queryset to get the log entries for. :type queryset: QuerySet :return: The LogAction objects for the objects in the given queryset. :rtype: QuerySet """ if not isinstance(queryset, QuerySet) or queryset.count() == 0: return self.none() content_type = ContentType.objects.get_for_model(queryset.model) primary_keys = queryset.values_list(queryset.model._meta.pk.name, flat=True) if isinstance(primary_keys[0], integer_types): return self.filter(content_type=content_type).filter(Q(object_id__in=primary_keys)).distinct() else: return self.filter(content_type=content_type).filter(Q(object_pk__in=primary_keys)).distinct()
def _earliest_or_latest(self, field_name=None, direction="-"): """ Overrides QuerySet._earliest_or_latest to add pk for secondary ordering """ order_by = field_name or getattr(self.model._meta, 'get_latest_by') assert bool(order_by), "earliest() and latest() require either a "\ "field_name parameter or 'get_latest_by' in the model" assert self.query.can_filter(), \ "Cannot change a query once a slice has been taken." obj = self._clone() obj.query.set_limits(high=1) obj.query.clear_ordering(force_empty=True) # add pk as secondary ordering for Submissions obj.query.add_ordering('%s%s' % (direction, order_by), '%s%s' % (direction, "pk")) return obj.get()
def get_queryset(self) -> QuerySet: tag_clean = self.request.GET.get('q', '').replace(" ", "_") m = re.match(r"^([-^])", tag_clean) if m: self.modifier = m.group(1) tag_clean = tag_clean.replace(self.modifier, "") else: self.modifier = '' scope_name = tag_clean.split(":", maxsplit=1) if len(scope_name) > 1: results = Tag.objects.exclude(source='user').filter( Q(name__contains=scope_name[1]), Q(scope__contains=scope_name[0])) else: results = Tag.objects.exclude(source='user').filter( Q(name__contains=tag_clean) | Q(scope__contains=tag_clean)) return results.distinct()
def eligible_for_use(self, **kwargs: typing.Any) -> QuerySet: return self.get_queryset().eligible_for_use(**kwargs) # # This is commented because it didn't work in MySQL # def need_new_archive(self, **kwargs): # # return self.annotate( # num_archives=Count('archive') # ).filter( # ( # ( # Q(num_archives=1) & # ~Q(filesize=F('archive__filesize')) & # ~Q(filesize=0) # ) | # Q(archive__isnull=True) # ), # **kwargs # ).prefetch_related('archive_set').order_by('-create_date')
def products_changed_handler(sender, **kwargs): """ ?????????? ??????? ????????? ???-?? ??????? ?????????, ??????????? ??????????????? ? ?????????. ???????????? ??? ?????????? ????????? ??????? ? ?????? ?????????. """ categories = kwargs.get('categories') if isinstance(categories, ShopCategory): # ????????? ????????? categories = ShopCategory.objects.filter(pk=categories.pk) elif isinstance(categories, (int, str)): # ?????? ??? ?????, ?????????? ID ????????? categories = ShopCategory.objects.filter(pk=categories) elif isinstance(categories, (list, tuple, set, ValuesListQuerySet)): # ?????? ????? ??? ?????, ?????????? ID ????????? categories = ShopCategory.objects.filter(pk__in=categories) elif isinstance(categories, QuerySet) and categories.model is ShopCategory: # QuerySet ????????? pass else: raise TypeError('Invalid categories for signal "products_changed"') with transaction.atomic(): categories.update( product_count=RawSQL( '(SELECT COUNT(*) ' 'FROM shop_shopproduct AS ssp ' 'WHERE ssp.category_id = shop_shopcategory.id ' 'AND ssp.is_visible = TRUE)', () ) ) categories.update( total_product_count=F('product_count') ) categories_changed.send(ShopCategory, categories=categories)
def test_get_group_filters(self): group = Group.objects.create(name='test group') group.permissions.add(*Permission.objects.filter(codename__in=['add_user', 'change_user'])) user = User.objects.create_user(username='testuser', password='test123.') user.groups.add(group) # Get group filters for group checker = utils.GraphPermissionChecker(group) filters = checker.get_group_filters() permissions = Permission.objects.filter(**filters) self.assertIsInstance(permissions, QuerySet) self.assertEqual(permissions.count(), 2) # Get group filters for use checker = utils.GraphPermissionChecker(user) filters = checker.get_group_filters() permissions = Permission.objects.filter(**filters) self.assertIsInstance(permissions, QuerySet) self.assertEqual(permissions.count(), 2)
def group(self): """ get a django channels Group consisting of all the reply_channels in the QuerySet """ self._channel_names = self.values_list('channel_name', flat=True) if not self._channel_names: empty_group = Group('emptyname') empty_group.empty = True return empty_group # group name is the hash of all the channel_names combined_names = b''.join( i.encode('utf-8', errors='replace') for i in self._channel_names ) group_id = md5(combined_names).hexdigest() combined_group = Group(name=group_id) for channel_name in self._channel_names: if channel_name.startswith('bot-'): continue combined_group.add(Channel(channel_name)) return combined_group
def get_obj_or_api_404(model_class_or_queryset, **lookups): """ Gets a object as per args in lookups dict. Can use either a model or a queryset to query on """ if isinstance(model_class_or_queryset, QuerySet): qs = model_class_or_queryset model_class = model_class_or_queryset.model else: qs = model_class_or_queryset.objects model_class = model_class_or_queryset try: return qs.get(**lookups) except model_class.DoesNotExist: raise NotFoundException( "{0} instance with {1} not found".format( model_class.__name__, str(lookups)))
def active(self, *args, **kwargs): """ Return active groups. :param args: additional args. :type args: list. :param kwargs: additional args. :type kwargs: dict. :return: queryset with active groups. :rtype: django.db.models.query.QuerySet. """ return self.filter(active=True)
def inactive(self, *args, **kwargs): """ Return inactive groups. :param args: additional args. :type args: list. :param kwargs: additional args. :type kwargs: dict. :return: queryset with inactive groups. :rtype: django.db.models.query.QuerySet. """ return self.filter(active=False)
def active(self, *args, **kwargs): """ Return active membership. :param args: additional args. :type args: list. :param kwargs: additional args. :type kwargs: dict. :return: queryset with active memberships. :rtype: django.db.models.query.QuerySet. """ return self.filter(active=True)
def inactive(self, *args, **kwargs): """ Return inactive membership. :param args: additional args. :type args: list. :param kwargs: additional args. :type kwargs: dict. :return: queryset with inactive memberships. :rtype: django.db.models.query.QuerySet. """ return self.filter(active=False)
def active(self, *args, **kwargs): """ Return users with is_active == True. :param args: additional args. :type args: list. :param kwargs: additional args. :type kwargs: dict. :return: queryset of users with is_active == True. :rtype: django.db.models.query.QuerySet. """ return self.filter(is_active=True)
def get_NAME_serializer(self, field: models.Field, qs: models.QuerySet, obj=None) -> Serializer: """ Returns serializer <NAME> field. This method (`get_NAME_serializer`) is not supposed to use directly. :param obj: optional obj passed to method :param field: model field :param qs: queryset :return: Serializer """ raise Exception() # noinspection PyPep8Naming,PyMethodMayBeStatic
def get_NAME_queryset(self, field: models.Field, obj=None) -> models.QuerySet: """ Redefines field <NAME> queryset (fk, m2m, etc. querysets). This method (`get_NAME_queryset`) is not supposed to use directly. :param field: :param obj: optional obj passed to method :return: QuerySet() """ raise Exception() # noinspection PyPep8Naming,PyMethodMayBeStatic
def get_NAME_dataset_url(self, field: models.Field, obj=None) -> models.QuerySet: """ Redefines all dataset_url bindings. Supposed to use in runtime (when there's no ability to pass reverse_lazy, etc.) :param field: :param obj: optional obj passed to method :return: QuerySet() """ raise Exception() # noinspection PyProtectedMember
def default_queryset_serializer(qs: models.QuerySet, many: bool = False) -> DRFMimicSerializer: """ Simple serializer that looks like default DRF serializer :param qs: queryset :param many: not used, mimic to DRF serializer :return: """ res = [{'id': item.id, 'name': str(item)} for item in qs] return DRFMimicSerializer(data=res)
def serialize_queryset(self, field: models.Field, qs: models.QuerySet) -> t.Dict: serializer = self.get_serializer(field) return serializer(qs, many=True).data
def get_for_model(self, model): """ Get log entries for all objects of a specified type. :param model: The model to get log entries for. :type model: class :return: QuerySet of log entries for the given model. :rtype: QuerySet """ # Return empty queryset if the given object is not valid. if not issubclass(model, models.Model): return self.none() ct = ContentType.objects.get_for_model(model) return self.filter(content_type=ct)
def default_filter(self, opts, *args, **kwargs): """ This is called by the view's ``get_queryset`` method. It calls the :func:`ajax_filter` first to apply the filter and then :func:`ajax_sort` to sort the filtered QuerySet. Each of these methods can be overridden to manipulate the QuerySet. :param opts: Filter options passed through request. :param args: Positional filter options in addition to opts. :param kwargs: Keword filter options in addition to opts. :return: Filtered QuerySet """ return self.ajax_filter(opts, *args, **kwargs).ajax_sorter(opts)
def __init__(self, field, value): self.field = field self.attname = getattr(self.field, 'attname', None) self.field_bound = self.attname is not None self.qs = (self.field.model._default_manager.all() if self.field_bound else QuerySet()) self.value = value
def get_random_song_list(self, n): # ?????n?QueryObject??,??QuerySet # ???????order_by('?') result = [] count = self.aggregate(count=Count('id'))['count'] for _ in range(n): try: random_index = randint(0, count - 1) except ValueError: raise MusicLibrary.DoesNotExist result.append(self.all()[random_index]) return result
def ordered_columns(self) -> QuerySet: """Get the ordered column values of this site.""" return self.sorted_columns
def active(self, session=None) -> models.QuerySet: qs = self.get_queryset().filter( status=TroubleshooterNotification.STATUS_NEW, created__gt=now() - timedelta(minutes=10), session__end__isnull=True, ) return qs.filter(session=session) if session else qs
def get_queryset(self) -> QuerySet: return Cashdesk.objects.filter(is_active=True).order_by('name')
def get_queryset(self) -> QuerySet: return CashdeskSession.objects.filter(end__isnull=False).order_by('-end')
def get_queryset(self) -> QuerySet: queryset = PreorderPosition.objects.all().order_by('id') exact_param = self.request.GET.get('secret', None) search_param = self.request.GET.get('search', None) if exact_param is not None: queryset = queryset.filter(secret__iexact=exact_param) elif search_param is not None and len(search_param) >= 6: queryset = queryset.filter(secret__istartswith=search_param) else: queryset = queryset.none() return queryset
def get_queryset(self) -> QuerySet: queryset = ListConstraintEntry.objects.all().order_by('id') listid_param = self.request.query_params.get('listid', None) if listid_param is not None: queryset = queryset.filter(list_id=listid_param) search_param = self.request.query_params.get('search', None) if search_param is not None and len(search_param) >= 3: queryset = queryset.filter(Q(name__icontains=search_param) | Q(identifier__icontains=search_param)) elif search_param is not None: queryset = queryset.filter(identifier__iexact=search_param) else: queryset = queryset.none() else: queryset = queryset.none() return queryset
def get_queryset(self) -> QuerySet: qs = TransactionPosition.objects.all()\ .order_by('-transaction__datetime')\ .select_related('transaction') if 'cashdesk' in self.filter: qs = qs.filter(transaction__session__cashdesk=self.filter['cashdesk']) if 'type' in self.filter and self.filter['type']: qs = qs.filter(type=self.filter['type']) if 'receipt' in self.filter and self.filter['receipt']: qs = qs.filter(transaction__receipt_id=self.filter['receipt']) return qs
def for_content_type_id(self, content_type_id): """ Method to return a queryset of CsvGenerator model instances for a given content type :param content_type_id: ContentType model id :type content_type_id: int :return: QuerySet of CsvGenerator model instances """ return self.filter(content_type_id=content_type_id)
def for_content_type(self, content_type): """ Method to return a queryset of CsvGenerator model instances for a given content type :param content_type: ContentType model :type content_type: django.contrib.contenttype.models.ContentType :return: QuerySet of CsvGenerator model instances """ return self.filter(content_type=content_type)
def for_model(self, model): """ Method to return a queryset of CsvGenerator model instances for a given type of model :param model: Model class or instance :type model: django.db.models.Model :return: QuerySet of CsvGenerator model instances """ return self.for_content_type(ContentType.objects.get_for_model(model))
def setup(): """ This is a bit dirty, but to make lifter available on django querysets, we simply add a custom method to django base queryset class. """ method_name = 'locally' setattr(QuerySet, method_name, _locally)
def get_connected_members(self) -> QuerySet: return self.member_set.filter(still_connected=True)
def get_members_who_finished_race(self) -> QuerySet: if self.parent_or_self.is_result: return self.parent_or_self.member_set.filter(still_connected=True) return None
def crawl_urls_caller(self, urls: List[str], wanted_filters: QuerySet=None, wanted_only: bool=False): try: self.crawl_urls(urls, wanted_filters=wanted_filters, wanted_only=wanted_only) except BaseException: thread_logger = logging.getLogger('viewer.threads') thread_logger.error(traceback.format_exc())
def crawl_urls(self, urls: List[str], wanted_filters: QuerySet = None, wanted_only: bool = False) -> None: pass
def sort_tags(tag_list: QuerySet) -> List[Tuple[str, List['Tag']]]: prioritized_tag_list = list() for scope in scope_priorities: matched_tags = sorted([x for x in tag_list if x.scope == scope], key=str) if matched_tags: prioritized_tag_list.append((scope, matched_tags)) remaining_tags = sorted([x for x in tag_list if x.scope not in scope_priorities], key=str) if remaining_tags: prioritized_tag_list.append((None, remaining_tags)) return prioritized_tag_list
def create_matches_wanted_galleries_from_providers(wanted_galleries: QuerySet, provider: str, logger: OptionalLogger=None) -> None: try: matchers = crawler_settings.provider_context.get_matchers(crawler_settings, logger, filter_name=provider, force=True, matcher_type='title') for matcher_element in matchers: matcher = matcher_element[0] for wanted_gallery in wanted_galleries: results = matcher.create_closer_matches_values(wanted_gallery.search_title) for gallery_data in results: gallery_data[1]['dl_type'] = 'gallery_match' gallery = Gallery.objects.update_or_create_from_values(gallery_data[1]) if gallery: GalleryMatch.objects.get_or_create( wanted_gallery=wanted_gallery, gallery=gallery, defaults={'match_accuracy': gallery_data[2]}) if logger: if results: logger.info( 'Searched for wanted gallery: {} in panda, total possible matches: {}'.format( wanted_gallery.title, wanted_gallery.possible_matches.count() ) ) else: logger.info( 'Searched for wanted gallery: {} in panda, no match found'.format(wanted_gallery.title) ) time.sleep(crawler_settings.wait_timer) if logger: logger.info('Search ended.') except BaseException: thread_logger = logging.getLogger('viewer.threads') thread_logger.error(traceback.format_exc()) # WantedGallery
def are_custom(self) -> QuerySet: return self.filter(source='user')
def not_custom(self) -> QuerySet: return self.exclude(source='user')
def first_artist_tag(self) -> QuerySet: return self.filter(scope__exact='artist').first()
def several_archives(self) -> QuerySet: return self.annotate( num_archives=Count('archive') ).filter(num_archives__gt=1).order_by('-id').prefetch_related('archive_set')
def non_used_galleries(self, **kwargs: typing.Any) -> QuerySet: return self.filter( ~Q(status=Gallery.DELETED), ~Q(dl_type__contains='skipped'), Q(archive__isnull=True), **kwargs ).order_by('-create_date')
def eligible_for_use(self, **kwargs: typing.Any) -> QuerySet: return self.filter( ~Q(status=Gallery.DELETED), **kwargs )
def filter_order_created(self, **kwargs: typing.Any) -> QuerySet: return self.filter(**kwargs).order_by('-create_date')
def filter_order_modified(self, **kwargs: typing.Any) -> QuerySet: return self.filter(**kwargs).order_by('-last_modified')
def after_posted_date(self, date: datetime) -> QuerySet: return self.filter(posted__gte=date).order_by('posted')
def filter_non_existent(self) -> QuerySet: return self.get_queryset().several_archives()
def non_used_galleries(self, **kwargs: typing.Any) -> QuerySet: return self.get_queryset().non_used_galleries(**kwargs)