我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用django.db.models.Count()。
def make_inactive_productlist_query(queryset): now = timezone.now() # Create a query of things are definitively inactive. Some of the ones # filtered here might be out of stock, but we include that later. inactive_candidates = ( queryset .exclude( Q(active=True) & (Q(deactivate_date=None) | Q(deactivate_date__gte=now))) .values("id") ) inactive_out_of_stock = ( queryset .filter(sale__timestamp__gt=F("start_date")) .annotate(c=Count("sale__id")) .filter(c__gte=F("quantity")) .values("id") ) return ( queryset .filter( Q(id__in=inactive_candidates) | Q(id__in=inactive_out_of_stock)) )
def getRegistrationTypesAveragesByYear(): srs = EventRegistration.objects.all() eligible_years = [x['event__year'] for x in srs.values('event__year').annotate(Count('event__year'))] eligible_years.sort() year_averages = [] for year in eligible_years: this_year_results = srs.filter(event__year=year).annotate( student=Case(When(registration__student=True,then=100),default=0,output_field=IntegerField()), door=Case(When(registration__payAtDoor=False,then=100),default=0,output_field=IntegerField()), droppedIn=Case(When(dropIn=True,then=100),default=0,output_field=IntegerField()), cancellation=Case(When(cancelled=True,then=100),default=0,output_field=IntegerField()), ).aggregate(Student=Avg('student'),Door=Avg('door'),DropIn=Avg('droppedIn'),Cancelled=Avg('cancellation'),year=Min('event__year')) year_averages.append(this_year_results) return year_averages
def check_common_set_constraints(self): """ Checks conditions that are true for most modern sets. """ # Only the basic lands should have multiple printings. multiple_printings = Card.objects.annotate(count=Count('printings')).filter(count__gte=2) basic_lands = {'Forest', 'Island', 'Mountain', 'Plains', 'Swamp'} if multiple_printings: self.assertQuerysetEqual(multiple_printings, basic_lands, ordered=False, transform=lambda x: x.name) # Every printing has a rarity. self.assertFalse(Printing.objects.filter(rarity__isnull=True)) # No cards of special rarity appear. self.assertFalse(Printing.objects.filter(rarity=Printing.Rarity.SPECIAL)) # Basic lands are marked appropriately. for printing in Printing.objects.filter(rarity=Printing.Rarity.BASIC_LAND): self.assertIn(printing.card.name, basic_lands) # Some printings have flavor text. self.assertTrue(Printing.objects.exclude(flavor_text='').exists())
def make_active_productlist_query(queryset): now = timezone.now() # Create a query for the set of products that MIGHT be active. Might # because they can be out of stock. Which we compute later active_candidates = ( queryset .filter( Q(active=True) & (Q(deactivate_date=None) | Q(deactivate_date__gte=now))) ) # This query selects all the candidates that are out of stock. candidates_out_of_stock = ( active_candidates .filter(sale__timestamp__gt=F("start_date")) .annotate(c=Count("sale__id")) .filter(c__gte=F("quantity")) .values("id") ) # We can now create a query that selects all the candidates which are not # out of stock. return ( active_candidates .exclude( Q(start_date__isnull=False) & Q(id__in=candidates_out_of_stock)))
def stars_employee_list_group_by_category(request, employee_id): """ Returns stars list from employee grouped by categories --- serializer: stars.serializers.StarEmployeeCategoriesSerializer responseMessages: - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden, authentication credentials were not provided - code: 404 message: Not found """ if request.method == 'GET': employee = get_object_or_404(Employee, pk=employee_id) employee_stars = Star.objects.filter(to_user=employee).values( 'category__pk', 'category__name').annotate(num_stars=Count('category')).order_by('-num_stars', 'category__name') paginator = PageNumberPagination() result = paginator.paginate_queryset(employee_stars, request) serializer = StarEmployeeCategoriesSerializer(result, many=True) return paginator.get_paginated_response(serializer.data)
def stars_employee_list_group_by_keyword(request, employee_id): """ Returns stars list from employee grouped by categories --- serializer: stars.serializers.StarEmployeeKeywordsSerializer responseMessages: - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden, authentication credentials were not provided - code: 404 message: Not found """ if request.method == 'GET': employee = get_object_or_404(Employee, pk=employee_id) employee_stars = Star.objects.filter(to_user=employee).values( 'keyword__pk', 'keyword__name').annotate(num_stars=Count('keyword')).order_by('-num_stars', 'keyword__name') paginator = PageNumberPagination() result = paginator.paginate_queryset(employee_stars, request) serializer = StarEmployeeKeywordsSerializer(result, many=True) return paginator.get_paginated_response(serializer.data)
def family_list(request): start_letter = request.GET.get('start_letter') if start_letter: first_letter = start_letter[0] else: first_letter = 'a' families = ProductFamily.objects \ .prefetch_related('group') \ .annotate(Count('file')) \ .order_by('name') if first_letter == '#': families = families.exclude(name__regex=r'^[A-Za-z]') else: families = families.filter(name__istartswith=first_letter) all_letters = '#' + string.ascii_lowercase context = {'families': families, 'first_letter': first_letter, 'all_letters': all_letters} return render(request, 'msdn/family_list.html', context)
def get_queryset(self): """Get queryset.""" return self.model.objects.all() \ .only('id', 'salutation', 'name', 'email') \ .annotate( number_of_books=Count('books'), first_book_published_on=Min('books__publication_date'), last_book_published_on=Max('books__publication_date'), lowest_book_price=Min('books__price'), highest_book_price=Max('books__price'), average_book_price=Avg('books__price'), average_number_of_pages_per_book=Avg('books__pages'), number_of_books_sold=Count('books__order_lines'), total_amount_earned=Sum( 'books__order_lines__book__price' ) )
def test_annotate_with_some_expressions(self): Blog.objects.create( category=Category.objects.get(name='Birds'), title_nl='Gull' ) qs = Category.objects.annotate( a=models.Count('blog__title_nl') + 1, b=1 + models.Count('blog__title_nl'), c=1 / models.Count('blog__title_nl'), d=4 * models.Count('blog__title_nl') ) self.assertEquals( set(qs.values_list('name', 'a', 'b', 'c', 'd')), { ('Birds', 3, 3, 0, 8), ('Mammals', 2, 2, 1, 4) } )
def create_assignments(cls, proposal, origin=AUTO_ASSIGNED_INITIAL): speakers = [proposal.speaker] + list(proposal.additional_speakers.all()) reviewers = User.objects.exclude( pk__in=[ speaker.user_id for speaker in speakers if speaker.user_id is not None ] ).filter( groups__name="reviewers", ).filter( Q(reviewassignment__opted_out=False) | Q(reviewassignment=None) ).annotate( num_assignments=models.Count("reviewassignment") ).order_by( "num_assignments", ) for reviewer in reviewers[:3]: cls._default_manager.create( proposal=proposal, user=reviewer, origin=origin, )
def group_by_person_name(self, candidacies_q): """ Return a list of QuerySets for Candidacies with the same Person name. """ q = candidacies_q.values('person__name').annotate( row_count=Count('id'), ).order_by().filter(row_count__gt=1) results = [] for i in q.all(): results.append( candidacies_q.filter( person__name=i['person__name'] ).order_by() ) return results
def group_by_other_name(self, candidacies_q): """ Return a list of QuerySets for Candidacies with shared other_name. """ # get the distinct count of person_ids for each other_name # linked to a person who's linked to one of the contest's candidacies q = candidacies_q.values('person__other_names__name').annotate( row_count=Count('person_id', distinct=True), ).order_by().filter(row_count__gt=1) results = [] for i in q.all(): if i['person__other_names__name']: results.append( candidacies_q.filter( person__other_names__name=i['person__other_names__name'] ).order_by() ) return results
def group_by_party(self, candidacies_q): """ Return a list of QuerySets for Candidacies with the same party. """ q = candidacies_q.values('party').annotate( row_count=Count('id'), ).order_by().filter(row_count__gt=1) results = [] for i in q.all(): results.append( candidacies_q.filter( party=i['party'] ).order_by() ) return results
def dedupe_person_ids(person): """ Remove duplicate PersonIdentifier objects linked to person. """ filer_ids = person.identifiers.filter(scheme='calaccess_filer_id') dupe_filer_ids = filer_ids.values("identifier").annotate( row_count=Count('id'), ).order_by().filter(row_count__gt=1) for i in dupe_filer_ids.all(): # delete all rows with that filer_id person.identifiers.filter(identifier=i['identifier']).delete() # then re-add the one person.identifiers.create( scheme='calaccess_filer_id', identifier=i['identifier'], ) return person
def handle(self, *args, **kwargs): total = 0 agg = TransactionPosition.objects\ .order_by('product')\ .values("product__name", "product__price")\ .annotate(total=Count('id'), reverses=Count('reverses')) for line in agg: count = line['total'] - line['reverses'] self.stdout.write("{line[product__name]:30} {line[product__price]:>20} EUR {count}".format( line=line, count=count) ) if 'ticket' in line['product__name']: total += count self.stdout.write(self.style.SUCCESS("Total tickets: {total}".format(total=total)))
def _get_analysis_chart(self): dweek = now().today() - timedelta(days=7) query = Post.posts \ .filter(created_time__gte=dweek) \ .annotate(day=TruncDay('created_time')) \ .values('verify', 'day') \ .annotate(cnt=Count('id')) \ .values('day', 'verify', 'cnt') \ .order_by() analysis_count = {} for l in list(query): day = l['day'].strftime('%Y-%m-%d') analysis_count[day] = analysis_count.get(day, {}) analysis_count[day][l['verify']] = l['cnt'] return analysis_count
def _get_user_chart(self): query = User.objects\ .values('level') \ .annotate(cnt=Count('id')) \ .values('level', 'cnt') \ .order_by() ret = [] user_level = dict(USER_LEVEL) for user in query: ret.append({ 'level': user_level.get(user['level']), 'count': user['cnt'] }) return ret
def _get_post_chart(self): query = Post.posts \ .values('level') \ .annotate(cnt=Count('id')) \ .values('level', 'cnt') \ .order_by() ret = [] user_level = dict(LEVEL_STATUS_CHOICES) for post in query: ret.append({ 'level': user_level.get(post['level']), 'count': post['cnt'] }) return ret
def get_specific_filtered_todo_tickets(where, ids, priority, status, treated_by, order_by, limit, offset): """ Returns a list of `abuse.models.Ticket` dict-mapping based on multiple filters """ return Ticket.objects.filter( where, ~Q(id__in=ids), priority=priority, status__in=status, **treated_by ).values( *TICKET_FIELDS ).order_by( *order_by ).annotate( attachedReportsCount=Count('reportTicket') ).distinct()[:limit * offset]
def status_summary(self): """Get interface status summary.""" base_query_set = super(PeeringSessionManager, self).get_queryset() summary = base_query_set.annotate( label=models.Case( models.When(provisioning_state=2, then=models.Case( models.When(admin_state=2, then=models.Case( models.When(operational_state=6, then=models.Value('Up')), default=models.Value('Down') )), default=models.Value('Admin Down') )), models.When(provisioning_state=1, then=models.Value('Provisioning')), default=models.Value('None'), output_field=models.CharField() )).values('label').annotate(value=models.Count('label')) return summary
def posts_issues(): all_jurs = Jurisdiction.objects.order_by('name') for jur in all_jurs: count = 0 issues = IssueType.get_issues_for('post') for issue in issues: if issue == 'many-memberships': queryset = Post.objects.filter( organization__jurisdiction=jur).annotate( num=Count('memberships')).filter( num__gt=F('maximum_memberships')) count += create_issues(queryset, issue, jur) elif issue == 'few-memberships': queryset = Post.objects.filter( organization__jurisdiction=jur).annotate( num=Count('memberships')).filter( num__lt=F('maximum_memberships')) count += create_issues(queryset, issue, jur) else: raise ValueError("Posts Importer needs " "update for new issue.") print("Imported Post Related {} Issues for {}".format(count, jur.name) )
def _get_unverified_reports(self, start, end): """ Get unverified reports. Unverified reports are reports on project which have a reviewer assigned but are not verified in given time frame. """ queryset = Report.objects.filter( date__range=[start, end], verified_by__isnull=True ) queryset = queryset.annotate( num_reviewers=Count('task__project__reviewers') ) queryset = queryset.filter(num_reviewers__gt=0) return queryset
def query_user_meta_count(user_id, is_follow=True, use_cache=True): """ ??????????????? :param user_id: ???????ID :param is_follow: ???True??????????????? :param use_cache: ?????? """ cache_key = CACHE_FANS_COUNT_KEY + str(user_id) if is_follow: cache_key = CACHE_FOLLOW_COUNT_KEY + str(user_id) if use_cache: count = cache.get(cache_key) if count: return count if is_follow: count = UserFollow.objects.filter(user_id=user_id, status=1).aggregate(Count("id")) else: count = UserFollow.objects.filter(follow_user=user_id, status=1).aggregate(Count("id")) count = count["id__count"] cache.set(cache_key, count, CACHE_COUNT_TIME) return count
def query_published_article_count(user_id, use_cache=True): """ ?????????????? :param user_id: ??ID :param use_cache: ?????? """ cache_key = CACHE_ARTICLE_COUNT + str(user_id) if use_cache: count = cache.get(cache_key) if count: return count count = BlogArticle.objects.filter(user_id=user_id, status=1).aggregate(Count("id")) count = count["id__count"] if count: cache.set(cache_key, count, CACHE_TIME) return count
def package_list(request, template_name="package/package_list.html"): categories = [] for category in Category.objects.annotate(package_count=Count("project")): element = { "title": category.title, "description": category.description, "count": category.package_count, "slug": category.slug, "title_plural": category.title_plural, "packages": category.project_set.annotate(usage_count=Count("usage")).order_by("-pypi_downloads", "-repo_watchers", "name")[:9] } categories.append(element) return render( request, template_name, { "categories": categories, "dpotw": Dpotw.objects.get_current(), "gotw": Gotw.objects.get_current(), } )
def post(self, request, target_id): target = get_object_or_404(Feature, pk=target_id) feature_ids = request.data.get('features') or [] features_queryset = Feature.objects.filter(id__in=feature_ids, dataset=target.dataset) # Make sure that we filtered for all feature ids if features_queryset.count() != len(feature_ids) and len(feature_ids) > 0: return Response(status=HTTP_404_NOT_FOUND, data={'detail': 'Not found.'}) result = ResultCalculationMap.objects.filter(target=target).last() slices_queryset = Slice.objects.filter(result_calculation_map=result).annotate( feature_count=Count('features')).filter(feature_count=len(feature_ids)) for feature in features_queryset.all(): slices_queryset = slices_queryset.filter(features=feature) if slices_queryset.count() == 1: output_definition = slices_queryset.last().output_definition else: return Response([]) return Response(output_definition)
def talk_email(request): talks = Talk.objects.filter(pk__in=request.session.get('talk-email-list', [])) count = talks.annotate(speakers_count=Count('speakers', distinct=True)).aggregate(Sum('speakers_count'))['speakers_count__sum'] if not talks.exists(): messages.error(request, _('Please select some talks.')) return redirect('talk-list') form = SendTalkMailForm(request.POST or None, initial=request.session.get('talk-email-stored'), talks=talks) if request.method == 'POST' and form.is_valid(): subject = form.cleaned_data['subject'] body = form.cleaned_data['body'] request.session['talk-email-stored'] = {'subject': subject, 'body': body} if form.cleaned_data['confirm']: sent = talk_email_send(talks, subject, body) messages.success(request, _('%(count)d mails have been sent.') % {'count': sent}) del request.session['talk-email-list'] return redirect('talk-list') else: messages.info(request, _('Your ready to send %(count)d emails.') % {'count': count}) else: form.fields.pop('confirm') return render(request, 'cfp/staff/talk_email.html', { 'talks': talks, 'form': form, })
def public_stats(request: HttpRequest) -> HttpResponse: """Display public galleries and archives stats.""" if not crawler_settings.urls.enable_public_stats: if not request.user.is_staff: raise Http404("Page not found") else: return render_error(request, "Page disabled by settings (urls: enable_public_stats).") stats_dict = { "n_archives": Archive.objects.filter(public=True).count(), "archive": Archive.objects.filter(public=True).filter(filesize__gt=0).aggregate( Avg('filesize'), Max('filesize'), Min('filesize'), Sum('filesize')), "n_tags": Tag.objects.filter(gallery_tags__public=True).distinct().count(), "top_10_tags": Tag.objects.filter(gallery_tags__public=True).distinct().annotate( num_archive=Count('gallery_tags')).order_by('-num_archive')[:10] } d = {'stats': stats_dict} return render(request, "viewer/public_stats.html", d)
def eligible_for_use(self, **kwargs: typing.Any) -> QuerySet: return self.get_queryset().eligible_for_use(**kwargs) # # This is commented because it didn't work in MySQL # def need_new_archive(self, **kwargs): # # return self.annotate( # num_archives=Count('archive') # ).filter( # ( # ( # Q(num_archives=1) & # ~Q(filesize=F('archive__filesize')) & # ~Q(filesize=0) # ) | # Q(archive__isnull=True) # ), # **kwargs # ).prefetch_related('archive_set').order_by('-create_date')
def handle(self, *args, **options): step = options.get('step', 10000) limit_date = datetime.datetime.now() - datetime.timedelta(days=options.get('days', 7)) print(limit_date) queryset = Tweet.objects.filter(datetime__lt=limit_date) while queryset.exists(): tweet_ids = [t.pk for t in queryset[:step]] Tweet.objects.filter(pk__in=tweet_ids).delete() print('deleted', len(tweet_ids)) user_ids = User.objects.annotate(Count('author')).filter(author__count=0).values_list('pk', flat=True) for pks in partition_all(step, user_ids): User.objects.filter(pk__in=pks).delete() print('deleted', len(pks), 'users')
def common(request): """blog???context???.""" try: mysite = SiteDetail.objects.latest('pk') except SiteDetail.DoesNotExist: mysite = None context = { 'categories': Category.objects.annotate( num_posts=Count('post')).order_by('-num_posts'), 'tags': Tag.objects.annotate( num_posts=Count('post')).order_by('-num_posts')[:10], 'comments': Comment.objects.annotate( num_recomments=Count('recomment')).order_by('-created_at')[:10], 'links': Link.objects.all(), 'analytics': Analytics.objects.all(), 'ads': Ads.objects.all(), 'global_form': PostSerachForm(request.GET), 'mysite': mysite, 'popular_post_list': PopularPost.objects.order_by('-page_view'), } return context
def project(request, pk): project = get_object_or_404(Project, pk=pk) seven_days_ago = timezone.now() - timedelta(days=7) count_last_week = { i["user"]: i["total"] for i in ItemResponse.objects.filter(created_at__gte=seven_days_ago, item__task__project=project).values("user").annotate(total=Count("user")) } leaders = [ {"user": User.objects.get(pk=i["user"]), "count": i["total"], "trailing_7_days": count_last_week.get(i["user"])} for i in ItemResponse.objects.filter(item__task__project=project).values("user").annotate(total=Count("user")).order_by("-total") ] return render(request, "project.html", { "project": project, "is_member": project.team.is_member(request.user), "leaders": leaders, })
def get_similar_target_with_sdk_files(self): """ Get the most recent similar target with TargetSDKFiles associated with it, for the purpose of cloning those files onto this target. """ similar_target = None candidates = self.get_similar_targets() if candidates.count() == 0: return similar_target # annotate with the count of files, to exclude any targets which # don't have associated files candidates = candidates.annotate(num_files=Count('targetsdkfile')) query = Q(task=self.task) & Q(num_files__gt=0) candidates = candidates.filter(query) if candidates.count() > 0: candidates.order_by('build__completed_on') similar_target = candidates.last() return similar_target
def setup_queryset(self, *args, **kwargs): queryset = Project.objects.all() # annotate each project with its number of builds queryset = queryset.annotate(num_builds=Count('build')) # exclude the command line builds project if it has no builds q_default_with_builds = Q(is_default=True) & Q(num_builds__gt=0) queryset = queryset.filter(Q(is_default=False) | q_default_with_builds) # order rows queryset = queryset.order_by(self.default_orderby) self.queryset = queryset # columns: last activity on (updated) - DEFAULT, project (name), release, # machine, number of builds, last build outcome, recipe (name), errors, # warnings, image files
def add_visitors_for_applicants(apps, schema_editor): db_alias = schema_editor.connection.alias Applicant = apps.get_model('intake', 'Applicant') Visitor = apps.get_model('intake', 'Visitor') ApplicationEvent = apps.get_model('intake', 'ApplicationEvent') applicants = Applicant.objects.using(db_alias).all() for app in applicants: if not app.visitor_id: visitor = Visitor() visitor.save() app.visitor_id = visitor.id app.save() first_app_event = ApplicationEvent.objects.using(db_alias).filter( applicant_id=app.id).order_by('time').first() visitor.first_visit = first_app_event.time visitor.save() for visitor in Visitor.objects.using(db_alias).annotate( applicant_count=Count('applicant')).filter(applicant_count__gt=1): roll_up_duplicate_applicants_per_visitor(visitor)
def setUpTestData(cls): super().setUpTestData() cls.have_a_fillable_pdf() org_subs = [] cls.combo_submissions = list( models.FormSubmission.objects.annotate( apps_count=Count('applications') ).filter(apps_count__gt=1)) for org in cls.orgs: subs = models.FormSubmission.objects.filter( applications__organization=org) subs = list(set(subs) - set(cls.combo_submissions)) setattr(cls, org.slug + "_submissions", subs) org_subs += subs setattr( cls, org.slug + "_bundle", models.ApplicationBundle.objects.filter( organization=org).first()) cls.submissions = list( set(org_subs) | set(cls.combo_submissions) )
def get_orgs_that_might_need_a_bundle_email_today(): """Returns Organizations which: - are receiving agencies - are checking notifications - have at least one user - have at least one submission and if today is the weekend: - should be notified on weekends """ orgs = Organization.objects.annotate( profile_count=Count('profiles'), submission_count=Count('submissions'), ).filter( is_checking_notifications=True, is_receiving_agency=True, profile_count__gte=1, submission_count__gte=1 ) if is_the_weekend(): return orgs.filter(notify_on_weekends=True) return orgs
def get(self,request): if not self.request.user.is_authenticated(): return redirect('mainPage:index') elif self.request.user.is_authenticated(): u = get_object_or_404(GomokuOnline,user=self.request.user) #u = GomokuOnline.objects.get(user=self.request.user) if u.playing or u.readyToPlay or u.wantToPlay: return redirect('games:gomokuJoinGame',room_id=u.room.id) try: room = Room.objects.annotate(Count('gomokuonline', distinct=True)).filter(gomokuonline__count=0)[:1].get() except Room.DoesNotExist: room = Room() room.save() chat = Chat() chat.room=room chat.save() u.room=room u.save() return redirect('games:gomokuJoinGame',room_id=room.id)
def get(self,request): if not self.request.user.is_authenticated(): return redirect('mainPage:index') elif self.request.user.is_authenticated(): u = get_object_or_404(BridgeOnline,user=self.request.user) if u.playing or u.readyToPlay or u.wantToPlay: return redirect('games:bridgeJoinGame',room_id=u.room.id) try: room = BridgeRoom.objects.annotate(Count('bridgeonline', distinct=True)).filter(bridgeonline__count=0)[:1].get() except BridgeRoom.DoesNotExist: room = BridgeRoom() room.save() chat = BridgeChat() chat.room=room chat.save() u.room=room u.save() return redirect('games:bridgeJoinGame',room_id=room.id)
def get_activity(self, limit=20, offset=0, distinct=False, friends_only=False, request=None): #Todo: make distinct work; combine friends and following, but then get posts from them friends = Friendship.get_friendships(self, 0) friend_ids = [] for friend in friends: friend_ids.append(friend.other(self)) follows = self.follow_source.filter().values_list('target', flat=True) if not friends_only: friend_ids.append(self.id) for thing in follows: friend_ids.append(thing) if request.user.is_authenticated: has_yeah = Yeah.objects.filter(post=OuterRef('id'), by=request.user.id) if distinct: posts = Post.objects.select_related('creator').select_related('community').annotate(num_yeahs=Count('yeah', distinct=True), num_comments=Count('comment', distinct=True), yeah_given=Exists(has_yeah, distinct=True)).annotate(max_created=Max('creator__post__created')).filter(created=F('max_created')).filter(creator__in=friend_ids).order_by('-created')[offset:offset + limit] else: posts = Post.objects.select_related('creator').select_related('community').annotate(num_yeahs=Count('yeah', distinct=True), num_comments=Count('comment', distinct=True), yeah_given=Exists(has_yeah, distinct=True)).filter(creator__in=friend_ids).order_by('-created')[offset:offset + limit] if request: for post in posts: post.setup(request) post.recent_comment = post.recent_comment() return posts
def get_comments(self, request=None, limit=0, offset=0): if request.user.is_authenticated: has_yeah = Yeah.objects.filter(comment=OuterRef('id'), by=request.user.id) if limit: comments = self.comment_set.select_related('creator').annotate(num_yeahs=Count('yeah'), yeah_given=Exists(has_yeah)).filter(original_post=self).order_by('created')[offset:offset + limit] elif offset: comments = self.comment_set.select_related('creator').annotate(num_yeahs=Count('yeah'), yeah_given=Exists(has_yeah)).filter(original_post=self).order_by('created')[offset:] else: comments = self.comment_set.select_related('creator').annotate(num_yeahs=Count('yeah'), yeah_given=Exists(has_yeah)).filter(original_post=self).order_by('created') else: if limit: comments = self.comment_set.select_related('creator').annotate(num_yeahs=Count('yeah')).filter(original_post=self).order_by('created')[offset:offset + limit] elif offset: comments = self.comment_set.select_related('creator').annotate(num_yeahs=Count('yeah')).filter(original_post=self).order_by('created')[offset:] else: comments = self.comment_set.select_related('creator').annotate(num_yeahs=Count('yeah')).filter(original_post=self).order_by('created') if request: for post in comments: post.setup(request) return comments
def queryset(self, request, queryset): """ Returns the filtered queryset based on the value provided in the query string and retrievable via `self.value()`. """ qs = queryset.annotate( num_photos=models.Count("productphoto") ) if self.value() == "zero": qs = qs.filter(num_photos=0) elif self.value() == "one": qs = qs.filter(num_photos=1) elif self.value() == "many": qs = qs.filter(num_photos__gte=2) return qs
def getLocationPerformance(startDate=None,endDate=None): timeFilters = {} if startDate: timeFilters['event__startTime__gte'] = startDate if endDate: timeFilters['event__startTime__lte'] = endDate seriesCounts = list(Location.objects.values_list('name').filter(**timeFilters).distinct().annotate(Count('event'))) timeFilters.update({ 'event__eventregistration__dropIn':False, 'event__eventregistration__cancelled':False }) eventRegistrationCounts = list(Location.objects.values_list('name').filter(**timeFilters).distinct().annotate(Count('event'))) results = {} for list_item in seriesCounts: results[list_item[0]] = {'series': list_item[1]} for list_item in eventRegistrationCounts: results[list_item[0]].update({'registrations': list_item[1]}) return results
def forward(apps, schema_editor): Content = apps.get_model("content", "Content") qs = Content.objects.annotate(child_count=Count("children"), share_count=Count("shares")).filter( Q(child_count__gt=0) | Q(share_count__gt=0) | Q(author__user__isnull=False)) for content in qs: # Replies share_ids = Content.objects.filter(share_of=content).values_list("id", flat=True) reply_count = content.children.count() + Content.objects.filter(parent_id__in=share_ids).count() # Shares shares_count = content.shares.count() # Local local = True if content.author.user else False Content.objects.filter(id=content.id).update( reply_count=reply_count, shares_count=shares_count, local=local, )
def bought(self): # @INCOMPLETE: If it's an unlimited item we just don't care about the # bought count - Jesper 27/09-2017 if self.start_date is None: return 0 return ( self.sale_set .filter(timestamp__gt=date_to_midnight(self.start_date)) .aggregate(bought=Count("id"))["bought"])
def _sales_to_user_in_period(username, start_date, end_date, product_list, product_dict): result = ( Product.objects .filter( sale__member__username__iexact=username, id__in=product_list, sale__timestamp__gte=start_date, sale__timestamp__lte=end_date) .annotate(cnt=Count("id")) .values_list("name", "cnt") ) products_bought = {product: count for product, count in result} return {product: products_bought.get(product, 0) for product in product_dict}
def daily(request): current_date = timezone.now().replace(hour=0, minute=0, second=0) latest_sales = (Sale.objects .prefetch_related('product', 'member') .order_by('-timestamp')[:7]) top_today = (Product.objects .filter(sale__timestamp__gt=current_date) .annotate(Count('sale')) .order_by('-sale__count')[:7]) startTime_day = timezone.now() - datetime.timedelta(hours=24) revenue_day = (Sale.objects .filter(timestamp__gt=startTime_day) .aggregate(Sum("price")) ["price__sum"]) or 0.0 startTime_month = timezone.now() - datetime.timedelta(days=30) revenue_month = (Sale.objects .filter(timestamp__gt=startTime_month) .aggregate(Sum("price")) ["price__sum"]) or 0.0 top_month_category = (Category.objects .filter(product__sale__timestamp__gt=startTime_month) .annotate(sale=Count("product__sale")) .order_by("-sale")[:7]) return render(request, 'admin/stregsystem/report/daily.html', locals())
def sales_api(request): startTime_month = timezone.now() - datetime.timedelta(days=30) qs = (Sale.objects .filter(timestamp__gt=startTime_month) .annotate(day=TruncDay('timestamp')) .values('day') .annotate(c=Count('*')) .annotate(r=Sum('price')) ) db_sales = {i["day"].date(): (i["c"], money(i["r"])) for i in qs} base = timezone.now().date() date_list = [base - datetime.timedelta(days=x) for x in range(0, 30)] sales_list = [] revenue_list = [] for date in date_list: if date in db_sales: sales, revenue = db_sales[date] sales_list.append(sales) revenue_list.append(revenue) else: sales_list.append(0) revenue_list.append(0) items = { "day": date_list, "sales": sales_list, "revenue": revenue_list, } return JsonResponse(items)
def sort_by_readable(user, category=None): """ Sorts books by most readable criterion. Uses aggregate 'count' function. :param django.contrib.auth.models.User user: The request user. :param app.models.Category category: The category. :return: The list with sorted books. """ books = [] if category: filtered_books = Book.exclude_private_books(user, Book.objects.filter(id_category=category)) else: filtered_books = Book.exclude_private_books(user, Book.objects.all()) for item in filtered_books: book_read_count = AddedBook.objects.filter(id_book=item).aggregate(Count('id_user')) book = {'id': item.id, 'name': item.book_name, 'author': item.id_author.author_name, 'url': item.photo.url if item.photo else '', 'read_count': book_read_count['id_user__count']} books.append(book) return sorted(books, key=lambda info: info['read_count'], reverse=True) # ------------------------------------------------------------------------------------------------------------------