我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用django.db.models.aggregates.Count()。
def get_usable_luns(cls, queryset): """ Get all Luns which are not used by Targets and have enough VolumeNode configuration to be used as a Target (i.e. have only one node or at least have a primary node set) Luns are usable if they have only one VolumeNode (i.e. no HA available but we can definitively say where it should be mounted) or if they have a primary VolumeNode (i.e. one or more VolumeNodes is available and we know at least where the primary mount should be) """ queryset = cls.get_unused_luns(queryset)\ .filter(volumenode__host__not_deleted=True)\ .annotate(has_primary=BoolOr('volumenode__primary'), num_volumenodes=Count('volumenode'))\ .filter(Q(num_volumenodes=1) | Q(has_primary=True)) return queryset
def get_stats(self): return self.values('status').annotate(amount=Count('id')).with_status_name()
def get_count(self, using): """ Performs a COUNT() query using the current filter constraints. """ obj = self.clone() obj.add_annotation(Count('*'), alias='__count', is_summary=True) number = obj.get_aggregation(using, ['__count'])['__count'] if number is None: number = 0 return number
def get_random_song_list(self, n): # ?????n?QueryObject??,??QuerySet # ???????order_by('?') result = [] count = self.aggregate(count=Count('id'))['count'] for _ in range(n): try: random_index = randint(0, count - 1) except ValueError: raise MusicLibrary.DoesNotExist result.append(self.all()[random_index]) return result
def fix_results_value_count(): results = Result.objects.annotate(number_of_values=Count('timeseriesresult__values')) for result in results: result.value_count = result.number_of_values result.save()
def random(self, user): conjugations = self if not user.is_anonymous(): mood_tense = MoodTense.objects.filter(users=user) if mood_tense.count(): conjugations = conjugations.filter(mood_tense__in=MoodTense.objects.filter(users=user)) count = conjugations.aggregate(count=Count('id'))['count'] random_index = random.randint(0, count - 1) return conjugations.order_by('id')[random_index]
def filter_popular(self, queryset, value): """ Recommend content that is popular with all users. :param queryset: all content nodes for this channel :param value: id of currently logged in user, or none if user is anonymous :return: 10 most popular content nodes """ if ContentSessionLog.objects.count() < 50: # return 25 random content nodes if not enough session logs pks = queryset.values_list('pk', flat=True).exclude(kind=content_kinds.TOPIC) # .count scales with table size, so can get slow on larger channels count_cache_key = 'content_count_for_popular' count = cache.get(count_cache_key) or min(pks.count(), 25) return queryset.filter(pk__in=sample(list(pks), count)) cache_key = 'popular_content' if cache.get(cache_key): return cache.get(cache_key) # get the most accessed content nodes content_counts_sorted = ContentSessionLog.objects \ .values_list('content_id', flat=True) \ .annotate(Count('content_id')) \ .order_by('-content_id__count') most_popular = queryset.filter(content_id__in=list(content_counts_sorted[:10])) # cache the popular results queryset for 10 minutes, for efficiency cache.set(cache_key, most_popular, 60 * 10) return most_popular
def data(self): return self.get_query().annotate( minutes=Sum('minutes'), users=Count('user', distinct=True) )
def get_queryset(self, request): return ( super().get_queryset(request) .annotate(participants_count=Count('participants', distinct=True)) .annotate(messages_count=Count('messages', distinct=True)) )
def get_queryset(self, request): return super().get_queryset(request).annotate(applications_count=Count('instances'))
def get_queryset(self, request): return super().get_queryset(request).annotate(terms_count=Count('terms'))
def _get_unrecoverable_nodes(self, user): """ Filter Workspace where the user is the only member :param user: :return: QuerySet """ member_cls = get_model('accounts', 'Member') return get_model('nodes', 'Node').objects.filter( pk__in=member_cls.objects.filter(user=user).values_list( 'node_id', flat=True) ).annotate(is_recoverable=Count('membership')).exclude( is_recoverable__gt=1)
def random(self): count = self.aggregate(count=Count('id'))['count'] random_index = randint(0, count - 1) return self.all()[random_index]
def top_hourly(self): now = timezone.now() time_begin = now - datetime.timedelta(hours=1) cursor = self.filter(log_time__gte=time_begin).values('keyword').annotate(pv=Count('ip', distinct=True)).order_by('-pv') return cursor
def top_daily(self): now = timezone.now() time_begin = now - datetime.timedelta(days=1) cursor = self.filter(log_time__gte=time_begin).values('keyword').annotate(pv=Count('ip', distinct=True)).order_by('-pv') return cursor
def top_hourly(self): now = timezone.now() time_begin = now - datetime.timedelta(hours=1) cursor = self.filter(log_time__gte=time_begin).values('hash_id').annotate(pv=Count('ip', distinct=True)).order_by('-pv') return cursor
def top_daily(self): now = timezone.now() time_begin = now - datetime.timedelta(days=1) cursor = self.filter(log_time__gte=time_begin).values('hash_id').annotate(pv=Count('ip', distinct=True)).order_by('-pv') return cursor
def data(self): return self.get_query(product=self.product).annotate(complete=Max('complete')) #def wiki_registrations(): # return ( # MediaWikiUser.objects # .annotate(year=Substr('registration', 1, 4)) # .order_by('year') # .values('year') # .annotate( # users=Count('id', distinct=True) # ) # ) # # #def wiki_revisions(): # return ( # Revision.objects # .annotate(year=Substr('rev_timestamp', 1, 4)) # .order_by('year') # .values('year') # .annotate( # revisions=Count('id') # ) # ) # # #def wiki_registrations_year(year): # return ( # MediaWikiUser.objects # .annotate( # year=Substr('registration', 1, 4), # month=Substr('registration', 5, 2) # ) # .filter(year=str(year).encode()) # .order_by('month') # .values('month') # .annotate( # users=Count('id', distinct=True) # ) # ) # # #def wiki_revisions_year(year): # return ( # Revision.objects # .annotate( # year=Substr('rev_timestamp', 1, 4), # month=Substr('rev_timestamp', 5, 2) # ) # .filter(year=str(year).encode()) # .order_by('month') # .values('month') # .annotate( # revisions=Count('id') # ) # )