我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.models.Case()。
def getRegistrationTypesAveragesByYear(): srs = EventRegistration.objects.all() eligible_years = [x['event__year'] for x in srs.values('event__year').annotate(Count('event__year'))] eligible_years.sort() year_averages = [] for year in eligible_years: this_year_results = srs.filter(event__year=year).annotate( student=Case(When(registration__student=True,then=100),default=0,output_field=IntegerField()), door=Case(When(registration__payAtDoor=False,then=100),default=0,output_field=IntegerField()), droppedIn=Case(When(dropIn=True,then=100),default=0,output_field=IntegerField()), cancellation=Case(When(cancelled=True,then=100),default=0,output_field=IntegerField()), ).aggregate(Student=Avg('student'),Door=Avg('door'),DropIn=Avg('droppedIn'),Cancelled=Avg('cancellation'),year=Min('event__year')) year_averages.append(this_year_results) return year_averages
def get_user_languages(self, language_preferences): """ Get the best matching Languages for a list of preferences, in order from best to worst. All languages will appear in the queryset. """ # Scores: Matched languages will get a score depending upon their # preference - better languages will get a higher score, down to 0 for # the 'worst' preferenced language. The default language will get a # score or -1 unless it matched as a preference, and all other # languages will get a score of -2 # `When` clauses for all the preferred languages clauses = [ When(code__iexact=language, then=Value(i)) for i, language in enumerate(reversed(language_preferences))] # The default language gets a score of -1 clauses.append(When(is_default=True, then=Value(-1))) return self.annotate( score=Case( *clauses, default=Value(-2), # Everything else gets -2 output_field=models.IntegerField(null=True)) ).order_by('-score', 'order')
def serve(self, request): language_preferences = get_request_language_preference(request) languages = Language.objects.get_user_languages(language_preferences) candidate_pages = TranslatedPage.objects\ .live().specific()\ .child_of(self)\ .filter(language__in=languages)\ .annotate(language_score=Case( *[When(language=language, then=Value(i)) for i, language in enumerate(languages)], default=Value(None), output_field=models.IntegerField(null=True)))\ .order_by('language_score') translation = candidate_pages.first() if translation: # Redirect to the best translation return redirect(translation.url) else: # No translation was found, not even in the default language! Oh dear. raise Http404
def status_summary(self): """Get interface status summary.""" base_query_set = super(PeeringSessionManager, self).get_queryset() summary = base_query_set.annotate( label=models.Case( models.When(provisioning_state=2, then=models.Case( models.When(admin_state=2, then=models.Case( models.When(operational_state=6, then=models.Value('Up')), default=models.Value('Down') )), default=models.Value('Admin Down') )), models.When(provisioning_state=1, then=models.Value('Provisioning')), default=models.Value('None'), output_field=models.CharField() )).values('label').annotate(value=models.Count('label')) return summary
def reload_reply(request): """API reload_reply""" if request.method == 'POST': id = request.POST['id'] replies = Reply.objects.filter(article_id=id).annotate( custom_order=Case( When(reply_id=0, then='id'), default='reply_id', output_field=IntegerField(), ) ).order_by('custom_order', 'id') article = get_object_or_404(Board, pk=id) return render_to_response( 'boards/show_reply.html', { 'user': request.user, 'article_user': article.user, 'replies': replies, 'count': replies.count() } ) else: return error_to_response(request)
def reload_team_reply(request): """API reload_team_reply""" if request.method == 'POST': id = request.POST['id'] replies = TeamReply.objects.filter(article_id=id).annotate( custom_order=Case( When(reply_id=0, then='id'), default='reply_id', output_field=IntegerField(), ) ).order_by('custom_order', 'id') article = get_object_or_404(Team, pk=id) return render_to_response( 'teams/show_team_reply.html', { 'user': request.user, 'article_user': article.user, 'replies': replies, 'count': replies.count() } ) else: return error_to_response(request)
def show_reply(context, id): """Show replies""" replies = Reply.objects.filter(article_id=id).annotate( custom_order=Case( When(reply_id=0, then='id'), default='reply_id', output_field=IntegerField(), ) ).order_by('custom_order', 'id') user = context['request'].user article = get_object_or_404(Board, pk=id) return { 'user': user, 'article_user': article.user, 'replies': replies, 'count': replies.count() }
def show_team_reply(context, id): """Show team replies""" replies = TeamReply.objects.filter(article_id=id).annotate( custom_order=Case( When(reply_id=0, then='id'), default='reply_id', output_field=IntegerField(), ) ).order_by('custom_order', 'id') user = context['request'].user article = get_object_or_404(Team, pk=id) return { 'user': user, 'article_user': article.user, 'replies': replies, 'count': replies.count() }
def show_comment(context, id): """Show comments""" q = Q(status='1normal') comments = Comment.objects.filter(post_id=id).filter(q).annotate( custom_order=Case( When(comment_id=0, then='id'), default='comment_id', output_field=IntegerField(), ) ).order_by('custom_order', 'id') user = context['request'].user post = get_object_or_404(Blog, pk=id) return { 'user': user, 'post_user': post.user.username, 'comments': comments, 'count': comments.count() }
def viral_video_detail(request, id): yesterday = datetime.date.today() - datetime.timedelta(days=1) qs = ViralVideo.objects.annotate( total_impressions=models.F("desktop_impressions") + models.F("mobile_impressions"), label=models.Case( models.When(total_impressions__gt=POPULAR_FROM, then=models.Value("popular")), models.When(created__gt=yesterday, then=models.Value("new")), default=models.Value("cool"), output_field=models.CharField(), ), ) # DEBUG: check the SQL query that Django ORM generates print(qs.query) qs = qs.filter(pk=id) if request.flavour == "mobile": qs.update(mobile_impressions=models.F("mobile_impressions") + 1) else: qs.update(desktop_impressions=models.F("desktop_impressions") + 1) video = get_object_or_404(qs) return render(request, "viral_videos/viral_video_detail.html", {'video': video})
def get_queryset(self): """ Append the calculated service area based on business logic. (Some bureaus are in service areas not reflected by the data) """ qs = models.BudgetHistory.objects.all() qs = qs.values('fiscal_year', 'service_area_code', 'bureau_code', 'bureau_name').annotate( sa_calced=Case( When(bureau_code__in = LA_Bureaus, then = Value('LA')), When(bureau_code__in = EO_Bureaus, then = Value('EO')), default = 'service_area_code', output_field = CharField() ), amount=Sum('amount')) qs = qs.order_by('fiscal_year', 'service_area_code', 'bureau_code', 'bureau_name') return qs
def get_queryset(self): """ Calculate service area based on business logic. (Some bureaus are in service areas not reflected by the data) """ qs = models.BudgetHistory.objects.all() qs = qs.values('fiscal_year', ).annotate( sa_calced=Case( When(bureau_code__in = LA_Bureaus, then = Value('LA')), When(bureau_code__in = EO_Bureaus, then = Value('EO')), default = 'service_area_code', output_field = CharField() ), amount=Sum('amount'), ) qs = qs.order_by('fiscal_year', 'sa_calced') return qs
def popularVouchersJSON(request): startDate = getDateTimeFromGet(request,'startDate') endDate = getDateTimeFromGet(request,'endDate') timeLimit = Q(voucheruse__creationDate__isnull=False) if startDate: timeLimit = timeLimit & Q(voucheruse__creationDate__gte=startDate) if endDate: timeLimit = timeLimit & Q(voucheruse__creationDate__lte=endDate) uses = list(Voucher.objects.annotate( counter=Count(Case( When(timeLimit, then=1), output_field=IntegerField()) )).filter(counter__gt=0).values('name','counter').order_by('-counter')[:10]) return JsonResponse(uses,safe=False)
def popularDiscountsJSON(request): startDate = getDateTimeFromGet(request,'startDate') endDate = getDateTimeFromGet(request,'endDate') timeLimit = Q(registrationdiscount__registration__dateTime__isnull=False) if startDate: timeLimit = timeLimit & Q(registrationdiscount__registration__dateTime__gte=startDate) if endDate: timeLimit = timeLimit & Q(registrationdiscount__registration__dateTime__lte=endDate) uses = list(DiscountCombo.objects.annotate( counter=Count(Case( When(timeLimit, then=1), output_field=IntegerField()) )).filter(counter__gt=0).values('name','counter').order_by('-counter')[:10]) return JsonResponse(uses,safe=False)
def registrations_matching_complex_pattern(): """ Q objects generation can be also automated """ predicate_list = [ ('event__name__endswith', 'python'), ('member__community__name__contains', 'python') ] q_object_list = [Q(predicate) for predicate in predicate_list] pattern = reduce(OR, q_object_list) registration_number = Registration.objects.filter(pattern).count() print("{nbr} match the pattern 'python'".format(nbr=registration_number)) ## When(), Case() expressions ##############################
def earlier_registration_discount(event_id): try: event = Event.objects.values('start').get(id=event_id) month_ago = event['start'] - timedelta(weeks=4) three_weeks_ago = event['start'] - timedelta(weeks=3) two_weeks_ago = event['start'] - timedelta(weeks=2) Registration.objects.filter(event_id=event_id).update( discount=Case( When(Q(registered_on__lte=month_ago), then=Value(15)), When(Q(registered_on__lte=three_weeks_ago), then=Value(10)), When(Q(registered_on__lte=two_weeks_ago), then=Value(5)), default=Value(0) )) # >>> UPDATE registration SET discount = CASE # WHEN registration.registered_on <= '2016-07-15 18:00:00' THEN 15 # WHEN registration.registered_on <= '2016-07-22 18:00:00' THEN 10 # WHEN registration.registered_on <= '2016-07-29 18:00:00' THEN 5 # ELSE 0 # END WHERE registration.event_id = 3 except Event.DoesNotExist as e: print("Insert valid event ID")
def test_polymorphic__complex_aggregate(self): """ test (complex expression on) aggregate (should work for annotate either) """ Model2A.objects.create(field1='A1') Model2B.objects.create(field1='A1', field2='B2') Model2B.objects.create(field1='A1', field2='B2') # aggregate using **kwargs result = Model2A.objects.aggregate( cnt_a1=Count(Case(When(field1='A1', then=1))), cnt_b2=Count(Case(When(Model2B___field2='B2', then=1))), ) self.assertEqual(result, {'cnt_b2': 2, 'cnt_a1': 3}) # aggregate using **args # we have to set the defaul alias or django won't except a complex expression # on aggregate/annotate def ComplexAgg(expression): complexagg = Count(expression)*10 complexagg.default_alias = 'complexagg' return complexagg with self.assertRaisesMessage(AssertionError, 'PolymorphicModel: annotate()/aggregate(): ___ model lookup supported for keyword arguments only'): Model2A.objects.aggregate(ComplexAgg('Model2B___field2'))
def to_queryset(self, keep_order=True): """ This method return a django queryset from the an elasticsearch result. It cost a query to the sql db. """ s = self # Do not query again if the es result is already cached if not hasattr(self, '_response'): # We only need the meta fields with the models ids s = self.source(excludes=['*']) pks = [result._id for result in s] qs = self._model.objects.filter(pk__in=pks) if keep_order: preserved_order = Case( *[When(pk=pk, then=pos) for pos, pk in enumerate(pks)] ) qs = qs.order_by(preserved_order) return qs
def reorder(_default=AFTER, _reverse=False, **kwargs): """ Return a database expression that can be used in an order_by() so that the queryset will be sorted according to the order of values given. """ if not 0 < len(kwargs) <= 1: raise TypeError("reorder() takes one non-optional keyword argument") fieldname, new_order = kwargs.popitem() if _default is BEFORE: _default = -1 elif _default is AFTER: _default = len(new_order) whens = [When(**{fieldname: v, 'then': i}) for i, v in enumerate(new_order)] casewhen = Case(*whens, default=Value(_default), output_field=IntegerField()) if _reverse: return casewhen.desc() else: return casewhen.asc()
def order_by_amount_raised(self): return self.annotate( amount_raised=models.Sum( models.Case( models.When( project__campaign__investment__charge__paid=True, project__campaign__investment__charge__refunded=False, then=( models.F('project__campaign__investment__num_shares') * models.F('project__campaign__value_per_share') ), ), default=0, output_field=models.IntegerField() ) ) ).order_by('-amount_raised')
def pre_filter(self, queryset, user): ''' Returns all of the items from queryset where the date falls into any specified range, but not yet where the stock limit is not yet reached.''' now = timezone.now() # Keep items with no start time, or start time not yet met. queryset = queryset.filter(Q(start_time=None) | Q(start_time__lte=now)) queryset = queryset.filter(Q(end_time=None) | Q(end_time__gte=now)) # Filter out items that have been reserved beyond the limits quantity_or_zero = self._calculate_quantities(user) remainder = Case( When(limit=None, then=Value(_BIG_QUANTITY)), default=F("limit") - Sum(quantity_or_zero), ) queryset = queryset.annotate(remainder=remainder) queryset = queryset.filter(remainder__gt=0) return queryset
def channel_new_messages_annotation(user): """ Queryset needs to annotated with channel_last_read for this to work :param user: :return: """ return Sum( Case( When( ~Q(action_targets__actor_object_id=user.id) & Q(action_targets__gt=F('channel_last_read')) & Q(action_targets__verb__in=[verbs.SEND, verbs.UPLOAD]), then=1 ), default=0, output_field=IntegerField() ) )
def annotate_channel_queryset_with_latest_activity_at(queryset, user): return queryset.annotate( latest_activity_timestamp=Max('action_targets__timestamp'), ).annotate( latest_activity_at=Case( When( latest_activity_timestamp__isnull=True, then='created_at' ), When( latest_activity_timestamp__gt=F('created_at'), then='latest_activity_timestamp' ), default='created_at', output_field=DateTimeField() ) )
def get_content(self): """Get queryset of Content objects. Keep ordering as returned by the list of content id's. """ ids, throughs = self.get_content_ids() # Case/When tip thanks to https://stackoverflow.com/a/37648265/1489738 preserved = Case(*[When(id=id, then=pos) for pos, id in enumerate(ids)]) content = Content.objects.filter(id__in=ids)\ .select_related("author__user", "share_of").prefetch_related("tags").order_by(preserved) return content, throughs
def average_for_views(self): case = Case(When(satisfied=True, then=Value(1)), When(satisfied=False, then=Value(0)), output_field=IntegerField()) return self.values('view_name').annotate(average=Avg(case), count=Count('view_name')).order_by('view_name')
def get_verified(self, instance): queryset = instance['queryset'] queryset = queryset.annotate( verified=Case( When(verified_by_id__isnull=True, then=False), default=True, output_field=BooleanField() ) ) instance['queryset'] = queryset return self._intersection(instance, 'verified')
def get_queryset(self): qs = super().get_queryset() qs = qs.annotate( accepted_talk_count=Count(Case(When(talk__accepted=True, then='talk__pk'), output_field=models.IntegerField()), distinct=True), pending_talk_count=Count(Case(When(talk__accepted=None, then='talk__pk'), output_field=models.IntegerField()), distinct=True), refused_talk_count=Count(Case(When(talk__accepted=False, then='talk__pk'), output_field=models.IntegerField()), distinct=True), ) return qs
def delete_reply(request): """API delete_reply""" if request.method == 'POST': id = request.POST['id'] reply = get_object_or_404(Reply, pk=id) if request.user == reply.user: reply.status = '6deleted' elif request.user.is_staff: reply.status = '5hidden' else: return error_to_response(request) reply.save() article_id = reply.article_id replies = Reply.objects.filter(article_id=article_id).annotate( custom_order=Case( When(reply_id=0, then='id'), default='reply_id', output_field=IntegerField(), ) ).order_by('custom_order', 'id') article = get_object_or_404(Board, pk=article_id) return render_to_response( 'boards/show_reply.html', { 'user': request.user, 'article_user': article.user, 'replies': replies, 'count': replies.count() } ) return error_to_response(request)
def restore_reply(request): """API restore_reply""" if request.method == 'POST': id = request.POST['id'] reply = get_object_or_404(Reply, pk=id) if request.user.is_staff: reply.status = '1normal' else: return error_to_response(request) reply.save() article_id = reply.article_id replies = Reply.objects.filter(article_id=article_id).annotate( custom_order=Case( When(reply_id=0, then='id'), default='reply_id', output_field=IntegerField(), ) ).order_by('custom_order', 'id') article = get_object_or_404(Board, pk=article_id) return render_to_response( 'boards/show_reply.html', { 'user': request.user, 'article_user': article.user, 'replies': replies, 'count': replies.count() } ) return error_to_response(request)
def delete_team_reply(request): """API delete_team_reply""" if request.method == 'POST': id = request.POST['id'] reply = get_object_or_404(TeamReply, pk=id) if request.user == reply.user: reply.status = '6deleted' elif request.user.is_staff: reply.status = '5hidden' else: return error_to_response(request) reply.save() article_id = reply.article_id replies = TeamReply.objects.filter(article_id=article_id).annotate( custom_order=Case( When(reply_id=0, then='id'), default='reply_id', output_field=IntegerField(), ) ).order_by('custom_order', 'id') article = get_object_or_404(Team, pk=article_id) return render_to_response( 'teams/show_team_reply.html', { 'user': request.user, 'article_user': article.user, 'replies': replies, 'count': replies.count() } ) return error_to_response(request)
def delete_comment(request): """API delete_comment""" if request.method == 'POST': id = request.POST['id'] comment = get_object_or_404(Comment, pk=id) if request.user.username == comment.userid or request.user.is_staff: comment.status = '6deleted' else: return error_to_response(request) comment.save() post_id = comment.post_id post = get_object_or_404(Blog, pk=post_id) post.comment_count -= 1 post.save() q = Q(status='1normal') comments = Comment.objects.filter(post_id=post_id).filter(q).annotate( custom_order=Case( When(post_id=0, then='id'), default='post_id', output_field=IntegerField(), ) ).order_by('custom_order', 'id') return render_to_response( 'blogs/show_comment.html', { 'user': request.user, 'post_user': post.user.username, 'comments': comments, 'count': comments.count() } ) return error_to_response(request)
def reload_comment(request): """API reload_comment""" if request.method == 'POST': id = request.POST['id'] post = get_object_or_404(Blog, pk=id) q = Q(status='1normal') comments = Comment.objects.filter(post_id=id).filter(q).annotate( custom_order=Case( When(post_id=0, then='id'), default='post_id', output_field=IntegerField(), ) ).order_by('custom_order', 'id') return render_to_response( 'blogs/show_comment.html', { 'user': request.user, 'post_user': post.user.username, 'comments': comments, 'count': comments.count() } ) else: return error_to_response(request)
def main_country_priority(queryset, country_field='country'): return queryset.annotate(main_country=Case( When(**{country_field: Country.objects.get(code=MAIN_COUNTRY), 'then': Value('1')}), default=Value('0'), output_field=IntegerField() )).order_by('-main_country', 'name')
def annotate_is_not_listed(self): return self.annotate( is_not_listed=Case( When(slug='not_listed', then=True), default=False, output_field=BooleanField()))
def annotate_bookmark(queryset, request=None): """ Annotates a queryset with information about wether an object was bookmarked or not. Used on search viewsets. """ if request and request.user.is_authenticated(): return queryset.annotate(is_bookmarked=Count(Case( When(bookmarks__user=request.user, then=True), output_field=BooleanField() )) ) else: return queryset.annotate(is_bookmarked=Value(False, BooleanField()))
def get_progress(self, target_user): content_node = ContentNode.objects.get(pk=self.context['view'].kwargs['content_node_id']) # progress details for a topic node and everything under it if content_node.kind == content_kinds.TOPIC: kinds = content_node.get_descendants().values_list('kind', flat=True).distinct() topic_details = ContentSummaryLog.objects \ .filter_by_topic(content_node) \ .filter(user=target_user) \ .values('kind') \ .annotate(total_progress=Sum('progress')) \ .annotate(log_count_total=Count('pk')) \ .annotate(log_count_complete=Sum(Case(When(progress=1, then=1), default=0, output_field=IntegerField()))) # evaluate queryset so we can add data for kinds that do not have logs topic_details = list(topic_details) for kind in topic_details: kinds.remove(kind['kind']) for kind in kinds: topic_details.append({'kind': kind, 'total_progress': 0.0, 'log_count_total': 0, 'log_count_complete': 0}) return topic_details else: # progress details for a leaf node (exercise, video, etc.) leaf_details = ContentSummaryLog.objects \ .filter(user=target_user) \ .filter(content_id=content_node.content_id) \ .annotate(total_progress=F('progress')) \ .values('kind', 'time_spent', 'total_progress') return leaf_details if leaf_details else [{'kind': content_node.kind, 'time_spent': 0, 'total_progress': 0.0}]
def get_queryset(self, *args, **kwargs): overridden_reviews = Review.objects.filter(override_vote__isnull=False, submission_id=models.OuterRef('pk')) return self.request.event.submissions\ .order_by('review_id')\ .annotate(has_override=models.Exists(overridden_reviews))\ .annotate(avg_score=models.Case( models.When( has_override=True, then=self.request.event.settings.review_max_score + 1, ), default=models.Avg('reviews__score') ))\ .order_by('-state', '-avg_score')
def crew_report(request): crew = User.objects.all() credit_result = [] for c in crew: items = OrderLine.objects.filter(order__user=c).values('item__name')\ .annotate(total=Sum('price'), number=Sum(Case(When(price__gt=0, then=1), default=-1, output_field=IntegerField()))) total = sum(map(lambda x: x['total'], items)) credit_result.append({'card': c.card, 'lines': items, 'name': c.first_name + ' ' + c.last_name, 'total': total}) return render(request, 'pos/crew_report.djhtml', {'crew': credit_result})
def sale_overview(request): order_lines = OrderLine.objects.all().values('item__id', 'order__payment_method')\ .annotate(total=Sum('price'), sold=Sum(Case(When(price__gt=0, then=1), default=-1, output_field=IntegerField()))) items = Item.objects.all().values('name', 'category__name', 'id', 'price') total = {'cash': 0, 'credit': 0, 'total': 0} overview = {} for item in items: per_payment_method = order_lines.filter(item_id=item['id']) try: credit = per_payment_method.filter(order__payment_method=1)[0] except IndexError: credit = {'sold': 0, 'total': 0} try: cash = per_payment_method.filter(order__payment_method=0)[0] except IndexError: cash = {'sold': 0, 'total': 0} item['cash'] = cash['total'] item['credit'] = credit['total'] item['sold'] = cash['sold'] + credit['sold'] item['total'] = item['cash'] + item['credit'] if item['price'] < 0: item['sold'] *= -1 if item['category__name'] in overview.keys(): overview[item['category__name']].append(item) else: overview[item['category__name']] = [item] total['cash'] += item['cash'] total['credit'] += item['credit'] total['total'] += item['total'] shifts = ShiftSerializer(Shift.objects.all(), many=True) print(shifts.data) return render(request, 'pos/sale_overview.djhtml', {'overview': overview, 'shifts': shifts.data, 'total': total})
def toggle_active(self, request, queryset): """ Inverts the ``is_active`` flag of chosen access rules. """ queryset.update( is_active=models.Case( models.When(is_active=True, then=models.Value(False)), default=models.Value(True))) self.message_user(request, _('Activated {0} and deactivated {1} ' 'access rules.'.format(queryset.filter(is_active=True).count(), queryset.filter(is_active=False).count())))
def get_queryset(self): return super(AssetTableView, self).get_queryset().annotate( scrapped=Sum(Case( When(transferlog__status=models.TransferLog.SCRAP, then=1), default=0, output_field=BooleanField() )), )
def get_queryset(self): return super().get_queryset() \ .annotate(ticket_count=models.Sum( models.Case( models.When( orderitem__order__refunded=True, then=0 ), models.When( orderitem__order__billed_total='', then=0 ), models.When( orderitem__order__billed_total__isnull=True, then=0 ), default=1, output_field=models.IntegerField(), ))) \ .annotate(sold_out=models.Case( models.When( ticket_count__lt=F('capacity'), then=models.Value(False), ), default=True, output_field=models.BooleanField(), ))
def _rate_value_condition(self, value): return models.Case( models.When(ratings__value=value, then=models.F('ratings__id')), output_field=models.IntegerField() )
def order_by_num_investors(self): return self.annotate( num_investors=models.Count( models.Case( models.When( project__campaign__investment__charge__paid=True, project__campaign__investment__charge__refunded=False, then='project__campaign__investment__charge__customer__user' ) ), distinct=True ) ).order_by('-num_investors')
def with_likes(self): return self.annotate( likes__count=Sum(Case(When(bloglikes__flag='like', then=1), default=0, output_field=IntegerField())) )
def with_dislikes(self): return self.annotate( dislikes__count=Sum(Case(When(bloglikes__flag='dislike', then=1), default=0, output_field=IntegerField())) )
def with_likes_flag(self, user): if not user.is_authenticated: return self return self.annotate( likes__flag=Sum( Case(When(bloglikes__user=user, bloglikes__flag='like', then=1), When(bloglikes__user=user, bloglikes__flag='dislike', then=-1), default=0, output_field=IntegerField())) )
def user_remainders(cls, user): ''' Return: Mapping[int->int]: A dictionary that maps the category ID to the user's remainder for that category. ''' categories = inventory.Category.objects.all() cart_filter = ( Q(product__productitem__cart__user=user) & Q(product__productitem__cart__status=commerce.Cart.STATUS_PAID) ) quantity = When( cart_filter, then='product__productitem__quantity' ) quantity_or_zero = Case( quantity, default=Value(0), ) remainder = Case( When(limit_per_user=None, then=Value(99999999)), default=F('limit_per_user') - Sum(quantity_or_zero), ) categories = categories.annotate(remainder=remainder) return dict((cat.id, cat.remainder) for cat in categories)
def _calculate_quantities(cls, user): reserved_carts = cls._relevant_carts(user) # Calculate category lines item_cats = F('categories__product__productitem__product__category') reserved_category_products = ( Q(categories=item_cats) & Q(categories__product__productitem__cart__in=reserved_carts) ) # Calculate product lines reserved_products = ( Q(products=F('products__productitem__product')) & Q(products__productitem__cart__in=reserved_carts) ) category_quantity_in_reserved_carts = When( reserved_category_products, then="categories__product__productitem__quantity", ) product_quantity_in_reserved_carts = When( reserved_products, then="products__productitem__quantity", ) quantity_or_zero = Case( category_quantity_in_reserved_carts, product_quantity_in_reserved_carts, default=Value(0), ) return quantity_or_zero
def _calculate_quantities(cls, user): reserved_carts = cls._relevant_carts(user) quantity_in_reserved_carts = When( discountitem__cart__in=reserved_carts, then="discountitem__quantity" ) quantity_or_zero = Case( quantity_in_reserved_carts, default=Value(0) ) return quantity_or_zero