我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用django.db.models.When()。
def getRegistrationTypesAveragesByYear(): srs = EventRegistration.objects.all() eligible_years = [x['event__year'] for x in srs.values('event__year').annotate(Count('event__year'))] eligible_years.sort() year_averages = [] for year in eligible_years: this_year_results = srs.filter(event__year=year).annotate( student=Case(When(registration__student=True,then=100),default=0,output_field=IntegerField()), door=Case(When(registration__payAtDoor=False,then=100),default=0,output_field=IntegerField()), droppedIn=Case(When(dropIn=True,then=100),default=0,output_field=IntegerField()), cancellation=Case(When(cancelled=True,then=100),default=0,output_field=IntegerField()), ).aggregate(Student=Avg('student'),Door=Avg('door'),DropIn=Avg('droppedIn'),Cancelled=Avg('cancellation'),year=Min('event__year')) year_averages.append(this_year_results) return year_averages
def validate_email(self): """Ensure emails are unique across the models tracking emails. Since it's essential to keep email addresses unique to support our workflows, a `ValidationError` will be raised if the email trying to be saved is already assigned to some other user. """ lookup = Q(email__iexact=self.email) if self.pk is not None: # When there's an update, ensure no one else has this address lookup &= ~Q(user=self) try: EmailAddress.objects.get(lookup) except EmailAddress.DoesNotExist: pass else: raise ValidationError({ 'email': [_('This email address already exists.')] })
def get_user_languages(self, language_preferences): """ Get the best matching Languages for a list of preferences, in order from best to worst. All languages will appear in the queryset. """ # Scores: Matched languages will get a score depending upon their # preference - better languages will get a higher score, down to 0 for # the 'worst' preferenced language. The default language will get a # score or -1 unless it matched as a preference, and all other # languages will get a score of -2 # `When` clauses for all the preferred languages clauses = [ When(code__iexact=language, then=Value(i)) for i, language in enumerate(reversed(language_preferences))] # The default language gets a score of -1 clauses.append(When(is_default=True, then=Value(-1))) return self.annotate( score=Case( *clauses, default=Value(-2), # Everything else gets -2 output_field=models.IntegerField(null=True)) ).order_by('-score', 'order')
def serve(self, request): language_preferences = get_request_language_preference(request) languages = Language.objects.get_user_languages(language_preferences) candidate_pages = TranslatedPage.objects\ .live().specific()\ .child_of(self)\ .filter(language__in=languages)\ .annotate(language_score=Case( *[When(language=language, then=Value(i)) for i, language in enumerate(languages)], default=Value(None), output_field=models.IntegerField(null=True)))\ .order_by('language_score') translation = candidate_pages.first() if translation: # Redirect to the best translation return redirect(translation.url) else: # No translation was found, not even in the default language! Oh dear. raise Http404
def status_summary(self): """Get interface status summary.""" base_query_set = super(PeeringSessionManager, self).get_queryset() summary = base_query_set.annotate( label=models.Case( models.When(provisioning_state=2, then=models.Case( models.When(admin_state=2, then=models.Case( models.When(operational_state=6, then=models.Value('Up')), default=models.Value('Down') )), default=models.Value('Admin Down') )), models.When(provisioning_state=1, then=models.Value('Provisioning')), default=models.Value('None'), output_field=models.CharField() )).values('label').annotate(value=models.Count('label')) return summary
def setup_columns(self, *args, **kwargs): name_link_template = ''' <a href="{% url 'recipedetails' extra.pid data.pk %}">{{data.name}}</a> ''' self.add_column(title="Image recipe", help_text="When you build an image recipe, you get an " "image: a root file system you can" "deploy to a machine", hideable=False, orderable=True, static_data_name="name", static_data_template=name_link_template, field_name="name") super(ImageRecipesTable, self).setup_columns(*args, **kwargs) self.add_column(**RecipesTable.build_col)
def setup_columns(self, *args, **kwargs): name_link_template = ''' <a href="{% url 'recipedetails' extra.pid data.pk %}">{{data.name}}</a> ''' self.add_column(title="Image recipe", help_text="When you build an image recipe, you get an " "image: a root file system you can" "deploy to a machine", hideable=False, orderable=True, static_data_name="name", static_data_template=name_link_template, field_name="name") super(ImageRecipesTable, self).setup_columns(*args, **kwargs) self.add_column(title="Customise", hideable=False, filter_name="in_current_project", static_data_name="customise-or-add-recipe", static_data_template='{% include "customise_btn.html" %}')
def annotate_queryset(self, queryset, request): self.get_skills_and_causes(request) queryset = queryset\ .annotate(\ cause_relevance = Count( Case(When(causes__pk__in=[1,3], then=1), output_field=IntegerField()), distinct=True), skill_relevance = Count( Case(When(skills__pk__in=[1,2,4], then=1), output_field=IntegerField()), distinct=True))\ .annotate(relevance = F('cause_relevance') + F('skill_relevance')) return queryset
def get_friendships(user, limit=50, offset=0, latest=False, online_only=False): if not limit: return Friendship.objects.filter(Q(source=user) | Q(target=user)).order_by('-created') if latest: if online_only: delta = timezone.now() - timedelta(seconds=48) awman = [] for friend in Friendship.objects.filter(Q(source=user) | Q(target=user)).order_by('-latest')[offset:offset + limit]: if friend.other(user).last_login > delta: awman.append(friend) return awman # Fix all of this at some point # return Friendship.objects.filter( #source=When(source__ne=user, source__last_login__gt=delta), #target=When(target__ne=user, target__last_login__gt=delta) #).order_by('-latest')[offset:offset + limit] else: return Friendship.objects.filter(Q(source=user) | Q(target=user)).order_by('-latest')[offset:offset + limit] else: return Friendship.objects.filter(Q(source=user) | Q(target=user)).order_by('-created')[offset:offset + limit]
def viral_video_detail(request, id): yesterday = datetime.date.today() - datetime.timedelta(days=1) qs = ViralVideo.objects.annotate( total_impressions=models.F("desktop_impressions") + models.F("mobile_impressions"), label=models.Case( models.When(total_impressions__gt=POPULAR_FROM, then=models.Value("popular")), models.When(created__gt=yesterday, then=models.Value("new")), default=models.Value("cool"), output_field=models.CharField(), ), ) # DEBUG: check the SQL query that Django ORM generates print(qs.query) qs = qs.filter(pk=id) if request.flavour == "mobile": qs.update(mobile_impressions=models.F("mobile_impressions") + 1) else: qs.update(desktop_impressions=models.F("desktop_impressions") + 1) video = get_object_or_404(qs) return render(request, "viral_videos/viral_video_detail.html", {'video': video})
def getRegistrationReferralCounts(startDate,endDate): ''' When a user accesses the class registration page through a referral URL, the marketing_id gets saved in the extra JSON data associated with that registration. This just returns counts associated with how often given referral terms appear in a specified time window (i.e. how many people signed up by clicking through a referral button). ''' timeFilters = {} if startDate: timeFilters['dateTime__gte'] = startDate if endDate: timeFilters['dateTime__lt'] = endDate regs = Registration.objects.filter(**timeFilters) counter = Counter([x.data.get('marketing_id',None) for x in regs if isinstance(x.data,dict)] + [None for x in regs if not isinstance(x.data,dict)]) results = [{'code': k or _('None'), 'count': v} for k,v in counter.items()] return results
def registrations_matching_complex_pattern(): """ Q objects generation can be also automated """ predicate_list = [ ('event__name__endswith', 'python'), ('member__community__name__contains', 'python') ] q_object_list = [Q(predicate) for predicate in predicate_list] pattern = reduce(OR, q_object_list) registration_number = Registration.objects.filter(pattern).count() print("{nbr} match the pattern 'python'".format(nbr=registration_number)) ## When(), Case() expressions ##############################
def earlier_registration_discount(event_id): try: event = Event.objects.values('start').get(id=event_id) month_ago = event['start'] - timedelta(weeks=4) three_weeks_ago = event['start'] - timedelta(weeks=3) two_weeks_ago = event['start'] - timedelta(weeks=2) Registration.objects.filter(event_id=event_id).update( discount=Case( When(Q(registered_on__lte=month_ago), then=Value(15)), When(Q(registered_on__lte=three_weeks_ago), then=Value(10)), When(Q(registered_on__lte=two_weeks_ago), then=Value(5)), default=Value(0) )) # >>> UPDATE registration SET discount = CASE # WHEN registration.registered_on <= '2016-07-15 18:00:00' THEN 15 # WHEN registration.registered_on <= '2016-07-22 18:00:00' THEN 10 # WHEN registration.registered_on <= '2016-07-29 18:00:00' THEN 5 # ELSE 0 # END WHERE registration.event_id = 3 except Event.DoesNotExist as e: print("Insert valid event ID")
def test_polymorphic__complex_aggregate(self): """ test (complex expression on) aggregate (should work for annotate either) """ Model2A.objects.create(field1='A1') Model2B.objects.create(field1='A1', field2='B2') Model2B.objects.create(field1='A1', field2='B2') # aggregate using **kwargs result = Model2A.objects.aggregate( cnt_a1=Count(Case(When(field1='A1', then=1))), cnt_b2=Count(Case(When(Model2B___field2='B2', then=1))), ) self.assertEqual(result, {'cnt_b2': 2, 'cnt_a1': 3}) # aggregate using **args # we have to set the defaul alias or django won't except a complex expression # on aggregate/annotate def ComplexAgg(expression): complexagg = Count(expression)*10 complexagg.default_alias = 'complexagg' return complexagg with self.assertRaisesMessage(AssertionError, 'PolymorphicModel: annotate()/aggregate(): ___ model lookup supported for keyword arguments only'): Model2A.objects.aggregate(ComplexAgg('Model2B___field2'))
def to_queryset(self, keep_order=True): """ This method return a django queryset from the an elasticsearch result. It cost a query to the sql db. """ s = self # Do not query again if the es result is already cached if not hasattr(self, '_response'): # We only need the meta fields with the models ids s = self.source(excludes=['*']) pks = [result._id for result in s] qs = self._model.objects.filter(pk__in=pks) if keep_order: preserved_order = Case( *[When(pk=pk, then=pos) for pos, pk in enumerate(pks)] ) qs = qs.order_by(preserved_order) return qs
def reorder(_default=AFTER, _reverse=False, **kwargs): """ Return a database expression that can be used in an order_by() so that the queryset will be sorted according to the order of values given. """ if not 0 < len(kwargs) <= 1: raise TypeError("reorder() takes one non-optional keyword argument") fieldname, new_order = kwargs.popitem() if _default is BEFORE: _default = -1 elif _default is AFTER: _default = len(new_order) whens = [When(**{fieldname: v, 'then': i}) for i, v in enumerate(new_order)] casewhen = Case(*whens, default=Value(_default), output_field=IntegerField()) if _reverse: return casewhen.desc() else: return casewhen.asc()
def order_by_amount_raised(self): return self.annotate( amount_raised=models.Sum( models.Case( models.When( project__campaign__investment__charge__paid=True, project__campaign__investment__charge__refunded=False, then=( models.F('project__campaign__investment__num_shares') * models.F('project__campaign__value_per_share') ), ), default=0, output_field=models.IntegerField() ) ) ).order_by('-amount_raised')
def pre_filter(self, queryset, user): ''' Returns all of the items from queryset where the date falls into any specified range, but not yet where the stock limit is not yet reached.''' now = timezone.now() # Keep items with no start time, or start time not yet met. queryset = queryset.filter(Q(start_time=None) | Q(start_time__lte=now)) queryset = queryset.filter(Q(end_time=None) | Q(end_time__gte=now)) # Filter out items that have been reserved beyond the limits quantity_or_zero = self._calculate_quantities(user) remainder = Case( When(limit=None, then=Value(_BIG_QUANTITY)), default=F("limit") - Sum(quantity_or_zero), ) queryset = queryset.annotate(remainder=remainder) queryset = queryset.filter(remainder__gt=0) return queryset
def get_content(self): """Get queryset of Content objects. Keep ordering as returned by the list of content id's. """ ids, throughs = self.get_content_ids() # Case/When tip thanks to https://stackoverflow.com/a/37648265/1489738 preserved = Case(*[When(id=id, then=pos) for pos, id in enumerate(ids)]) content = Content.objects.filter(id__in=ids)\ .select_related("author__user", "share_of").prefetch_related("tags").order_by(preserved) return content, throughs
def average_for_views(self): case = Case(When(satisfied=True, then=Value(1)), When(satisfied=False, then=Value(0)), output_field=IntegerField()) return self.values('view_name').annotate(average=Avg(case), count=Count('view_name')).order_by('view_name')
def get_queryset(self): qs = super().get_queryset() qs = qs.annotate( accepted_talk_count=Count(Case(When(talk__accepted=True, then='talk__pk'), output_field=models.IntegerField()), distinct=True), pending_talk_count=Count(Case(When(talk__accepted=None, then='talk__pk'), output_field=models.IntegerField()), distinct=True), refused_talk_count=Count(Case(When(talk__accepted=False, then='talk__pk'), output_field=models.IntegerField()), distinct=True), ) return qs
def get_progress(self, target_user): content_node = ContentNode.objects.get(pk=self.context['view'].kwargs['content_node_id']) # progress details for a topic node and everything under it if content_node.kind == content_kinds.TOPIC: kinds = content_node.get_descendants().values_list('kind', flat=True).distinct() topic_details = ContentSummaryLog.objects \ .filter_by_topic(content_node) \ .filter(user=target_user) \ .values('kind') \ .annotate(total_progress=Sum('progress')) \ .annotate(log_count_total=Count('pk')) \ .annotate(log_count_complete=Sum(Case(When(progress=1, then=1), default=0, output_field=IntegerField()))) # evaluate queryset so we can add data for kinds that do not have logs topic_details = list(topic_details) for kind in topic_details: kinds.remove(kind['kind']) for kind in kinds: topic_details.append({'kind': kind, 'total_progress': 0.0, 'log_count_total': 0, 'log_count_complete': 0}) return topic_details else: # progress details for a leaf node (exercise, video, etc.) leaf_details = ContentSummaryLog.objects \ .filter(user=target_user) \ .filter(content_id=content_node.content_id) \ .annotate(total_progress=F('progress')) \ .values('kind', 'time_spent', 'total_progress') return leaf_details if leaf_details else [{'kind': content_node.kind, 'time_spent': 0, 'total_progress': 0.0}]
def get_queryset(self, *args, **kwargs): overridden_reviews = Review.objects.filter(override_vote__isnull=False, submission_id=models.OuterRef('pk')) return self.request.event.submissions\ .order_by('review_id')\ .annotate(has_override=models.Exists(overridden_reviews))\ .annotate(avg_score=models.Case( models.When( has_override=True, then=self.request.event.settings.review_max_score + 1, ), default=models.Avg('reviews__score') ))\ .order_by('-state', '-avg_score')
def crew_report(request): crew = User.objects.all() credit_result = [] for c in crew: items = OrderLine.objects.filter(order__user=c).values('item__name')\ .annotate(total=Sum('price'), number=Sum(Case(When(price__gt=0, then=1), default=-1, output_field=IntegerField()))) total = sum(map(lambda x: x['total'], items)) credit_result.append({'card': c.card, 'lines': items, 'name': c.first_name + ' ' + c.last_name, 'total': total}) return render(request, 'pos/crew_report.djhtml', {'crew': credit_result})
def sale_overview(request): order_lines = OrderLine.objects.all().values('item__id', 'order__payment_method')\ .annotate(total=Sum('price'), sold=Sum(Case(When(price__gt=0, then=1), default=-1, output_field=IntegerField()))) items = Item.objects.all().values('name', 'category__name', 'id', 'price') total = {'cash': 0, 'credit': 0, 'total': 0} overview = {} for item in items: per_payment_method = order_lines.filter(item_id=item['id']) try: credit = per_payment_method.filter(order__payment_method=1)[0] except IndexError: credit = {'sold': 0, 'total': 0} try: cash = per_payment_method.filter(order__payment_method=0)[0] except IndexError: cash = {'sold': 0, 'total': 0} item['cash'] = cash['total'] item['credit'] = credit['total'] item['sold'] = cash['sold'] + credit['sold'] item['total'] = item['cash'] + item['credit'] if item['price'] < 0: item['sold'] *= -1 if item['category__name'] in overview.keys(): overview[item['category__name']].append(item) else: overview[item['category__name']] = [item] total['cash'] += item['cash'] total['credit'] += item['credit'] total['total'] += item['total'] shifts = ShiftSerializer(Shift.objects.all(), many=True) print(shifts.data) return render(request, 'pos/sale_overview.djhtml', {'overview': overview, 'shifts': shifts.data, 'total': total})
def toggle_active(self, request, queryset): """ Inverts the ``is_active`` flag of chosen access rules. """ queryset.update( is_active=models.Case( models.When(is_active=True, then=models.Value(False)), default=models.Value(True))) self.message_user(request, _('Activated {0} and deactivated {1} ' 'access rules.'.format(queryset.filter(is_active=True).count(), queryset.filter(is_active=False).count())))
def get_queryset(self): return (Theme.objects .values('id', 'name') .annotate(poems_count=models.Count(models.Case(models.When(poem__is_shown=1, then=1)))))
def poets_with_count(self): return (Poet.objects .filter(age=self.id) .values('id', 'name', 'slug') .annotate(poems_count=models.Count(models.Case(models.When(poem__is_shown=1, then=1)))))
def poets_list_alphabetical(column_nb): poets = (Poet.objects .filter(is_active=1) .values('id', 'name', 'slug') .annotate(poems_count=models.Count(models.Case(models.When(poem__is_shown=1, then=1))))) return {'poets': poets, 'column_nb': column_nb}
def get_queryset(self): return super().get_queryset() \ .annotate(ticket_count=models.Sum( models.Case( models.When( orderitem__order__refunded=True, then=0 ), models.When( orderitem__order__billed_total='', then=0 ), models.When( orderitem__order__billed_total__isnull=True, then=0 ), default=1, output_field=models.IntegerField(), ))) \ .annotate(sold_out=models.Case( models.When( ticket_count__lt=F('capacity'), then=models.Value(False), ), default=True, output_field=models.BooleanField(), ))
def _rate_value_condition(self, value): return models.Case( models.When(ratings__value=value, then=models.F('ratings__id')), output_field=models.IntegerField() )
def order_by_num_investors(self): return self.annotate( num_investors=models.Count( models.Case( models.When( project__campaign__investment__charge__paid=True, project__campaign__investment__charge__refunded=False, then='project__campaign__investment__charge__customer__user' ) ), distinct=True ) ).order_by('-num_investors')
def with_likes(self): return self.annotate( likes__count=Sum(Case(When(bloglikes__flag='like', then=1), default=0, output_field=IntegerField())) )
def with_dislikes(self): return self.annotate( dislikes__count=Sum(Case(When(bloglikes__flag='dislike', then=1), default=0, output_field=IntegerField())) )
def with_likes_flag(self, user): if not user.is_authenticated: return self return self.annotate( likes__flag=Sum( Case(When(bloglikes__user=user, bloglikes__flag='like', then=1), When(bloglikes__user=user, bloglikes__flag='dislike', then=-1), default=0, output_field=IntegerField())) )
def user_remainders(cls, user): ''' Return: Mapping[int->int]: A dictionary that maps the category ID to the user's remainder for that category. ''' categories = inventory.Category.objects.all() cart_filter = ( Q(product__productitem__cart__user=user) & Q(product__productitem__cart__status=commerce.Cart.STATUS_PAID) ) quantity = When( cart_filter, then='product__productitem__quantity' ) quantity_or_zero = Case( quantity, default=Value(0), ) remainder = Case( When(limit_per_user=None, then=Value(99999999)), default=F('limit_per_user') - Sum(quantity_or_zero), ) categories = categories.annotate(remainder=remainder) return dict((cat.id, cat.remainder) for cat in categories)
def _calculate_quantities(cls, user): reserved_carts = cls._relevant_carts(user) # Calculate category lines item_cats = F('categories__product__productitem__product__category') reserved_category_products = ( Q(categories=item_cats) & Q(categories__product__productitem__cart__in=reserved_carts) ) # Calculate product lines reserved_products = ( Q(products=F('products__productitem__product')) & Q(products__productitem__cart__in=reserved_carts) ) category_quantity_in_reserved_carts = When( reserved_category_products, then="categories__product__productitem__quantity", ) product_quantity_in_reserved_carts = When( reserved_products, then="products__productitem__quantity", ) quantity_or_zero = Case( category_quantity_in_reserved_carts, product_quantity_in_reserved_carts, default=Value(0), ) return quantity_or_zero
def _calculate_quantities(cls, user): reserved_carts = cls._relevant_carts(user) quantity_in_reserved_carts = When( discountitem__cart__in=reserved_carts, then="discountitem__quantity" ) quantity_or_zero = Case( quantity_in_reserved_carts, default=Value(0) ) return quantity_or_zero
def user_remainders(cls, user): ''' Return: Mapping[int->int]: A dictionary that maps the product ID to the user's remainder for that product. ''' products = inventory.Product.objects.all() cart_filter = ( Q(productitem__cart__user=user) & Q(productitem__cart__status=commerce.Cart.STATUS_PAID) ) quantity = When( cart_filter, then='productitem__quantity' ) quantity_or_zero = Case( quantity, default=Value(0), ) remainder = Case( When(limit_per_user=None, then=Value(99999999)), default=F('limit_per_user') - Sum(quantity_or_zero), ) products = products.annotate(remainder=remainder) return dict((product.id, product.remainder) for product in products)
def speaker_registrations(request, form): ''' Shows registration status for speakers with a given proposal kind. ''' kinds = form.cleaned_data["kind"] presentations = schedule_models.Presentation.objects.filter( proposal_base__kind__in=kinds, ).exclude( cancelled=True, ) users = User.objects.filter( Q(speaker_profile__presentations__in=presentations) | Q(speaker_profile__copresentations__in=presentations) ) paid_carts = commerce.Cart.objects.filter(status=commerce.Cart.STATUS_PAID) paid_carts = Case( When(cart__in=paid_carts, then=Value(1)), default=Value(0), output_field=models.IntegerField(), ) users = users.annotate(paid_carts=Sum(paid_carts)) users = users.order_by("paid_carts") return QuerysetReport( "Speaker Registration Status", ["id", "speaker_profile__name", "email", "paid_carts"], users, link_view=attendee, ) return []
def get_queryset(self): """Get queryset with custom annotations.""" base_query_set = super(PeeringSessionManager, self).get_queryset() query_set = base_query_set.annotate( session_state=models.Case( models.When(provisioning_state=2, then=models.Case( models.When(admin_state=2, then=models.Case( models.When(operational_state=6, then=models.Value('Up')), default=models.Value('Down') )), default=models.Value('Admin Down') )), models.When(provisioning_state=1, then=models.Value('Provisioning')), default=models.Value('None'), output_field=models.CharField() ), local_address=models.Case( models.When(af=1, then=models.F('prngrtriface__netixlan__ipaddr4')), models.When(af=2, then=models.F('prngrtriface__netixlan__ipaddr6')), default=None, output_field=IPAddressField() ), remote_address=models.Case( models.When(af=1, then=models.F('peer_netixlan__ipaddr4')), models.When(af=2, then=models.F('peer_netixlan__ipaddr6')), default=None, output_field=IPAddressField() ), address_family=models.Case( models.When(af=1, then=models.Value('IPv4')), models.When(af=2, then=models.Value('IPv6')), default=models.Value('Unknown'), output_field=models.CharField() ), ixp_name=models.F('prngrtriface__netixlan__ixlan__ix__name'), router_hostname=models.F('prngrtriface__prngrtr__hostname'), remote_network_name=models.F('peer_netixlan__net__name'), remote_network_asn=models.F('peer_netixlan__net__asn') ) return query_set
def setup_queryset(self, *args, **kwargs): """ The queryset is annotated so that it can be sorted by number of errors and number of warnings; but note that the criteria for finding the log messages to populate these fields should match those used in the Build model (orm/models.py) to populate the errors and warnings properties """ queryset = self.get_builds() # Don't include in progress builds pr cancelled builds queryset = queryset.exclude(Q(outcome=Build.IN_PROGRESS) | Q(outcome=Build.CANCELLED)) # sort queryset = queryset.order_by(self.default_orderby) # annotate with number of ERROR, EXCEPTION and CRITICAL log messages criteria = (Q(logmessage__level=LogMessage.ERROR) | Q(logmessage__level=LogMessage.EXCEPTION) | Q(logmessage__level=LogMessage.CRITICAL)) queryset = queryset.annotate( errors_no=Count( Case( When(criteria, then=Value(1)), output_field=IntegerField() ) ) ) # annotate with number of WARNING log messages queryset = queryset.annotate( warnings_no=Count( Case( When(logmessage__level=LogMessage.WARNING, then=Value(1)), output_field=IntegerField() ) ) ) self.queryset = queryset
def getAveragesByClassType(startDate=None,endDate=None): # If a date filter was passed in GET, then apply it when_all = { 'classdescription__series__eventregistration__cancelled': False, 'classdescription__series__eventregistration__dropIn': False } timeFilters = {} classFilters = {} roleFilters = Q() if startDate: timeFilters['classdescription__series__startTime__gte'] = startDate classFilters['startTime__gte'] = startDate roleFilters = roleFilters & (Q(eventrole__event__startTime__gte=startDate) | Q(eventregistration__event__startTime__gte=startDate)) if endDate: timeFilters['classdescription__series__startTime__lte'] = endDate classFilters['startTime__lte'] = endDate roleFilters = roleFilters & (Q(eventrole__event__startTime__lte=endDate) | Q(eventregistration__event__startTime__lte=endDate)) when_all.update(timeFilters) role_list = DanceRole.objects.filter(roleFilters).distinct() annotations = {'registrations': Sum(Case(When(Q(**when_all),then=1),output_field=IntegerField()))} values_list = ['name', 'danceType__name','registrations'] for this_role in role_list: annotations[this_role.pluralName] = Sum(Case(When(Q(Q(**when_all) & Q(classdescription__series__eventregistration__role=this_role)),then=1),output_field=IntegerField())) values_list.append(this_role.pluralName) registration_counts = list(DanceTypeLevel.objects.annotate(**annotations).values_list(*values_list)) class_counter = Counter([(x.classDescription.danceTypeLevel.name, x.classDescription.danceTypeLevel.danceType.name) for x in Series.objects.filter(**classFilters).distinct()]) results = {} for list_item in registration_counts: type_name = ' '.join((str(list_item[0]),str(list_item[1]))) results[type_name] = { str(_('Registrations')): list_item[2], } m = 3 for this_role in role_list: results[type_name][str(_('Total %s' % this_role.pluralName))] = list_item[m] m += 1 for k,count in class_counter.items(): type_name = ' '.join((str(k[0]),str(k[1]))) results[type_name].update({ str(_('Series')): count }) for k,v in results.items(): if results[k].get(str(_('Series'))): results[k].update({ str(_('Average Registrations')): (results[k][str(_('Registrations'))] or 0) / float(results[k][str(_('Series'))]), }) for this_role in role_list: results[k][str(_('Average %s' % this_role.pluralName))] = (results[k][str(_('Total %s' % this_role.pluralName))] or 0) / float(results[k][str(_('Series'))]) return results
def get_queryset(self): queryset = Activity.objects.all() if self.action == 'timeline': user = self.request.user queryset = queryset.filter( models.Q( user__in=models.Subquery( user.following.through.objects.filter( to_user_id=user.id ).values('from_user_id') ) ) | ( models.Q( brick_name__in=models.Subquery( user.bricks_watching.through.objects.filter( user=user.id ).values('brick') ) ) & ~models.Q(user=user.id) ) ) user = self.request.query_params.get('user', None) type = self.request.query_params.get('type', None) if user is not None: queryset = queryset.filter(user__username=user) if type is not None: queryset = queryset.filter(type__in=type.split(',')) queryset = queryset.annotate( score=models.Case( models.When( type='Watch', then=models.Subquery( BiobrickMeta.objects .filter(part_name=models.OuterRef('brick_name')) .values('rate_score') ) ), default=None ) ) return queryset.order_by('-acttime')
def rsvp_form(request): guest_query = Guest.objects.filter(id=request.session['guest_id']) guest = guest_query.first() if guest.group: # Use case/when to force currently logged in guest to be first guests = guest.group.guest_set.order_by(Case(When(id=guest.id, then=0), default=1), "last_name", "first_name") else: guests = guest_query if request.method == 'POST': formset = RsvpFormSet(request.POST) for form in formset: form.fields['guest'].queryset = guests if form.is_valid(): changes = {} for form in formset: form.full_clean() guest = form.cleaned_data['guest'] if form.changed_data: changes[guest] = {'old_attending': guest.attending, **form.cleaned_data} guest.attending = form.cleaned_data['attending'] guest.email = form.cleaned_data['email'] guest.dietary_requirements = form.cleaned_data['dietary_requirements'] guest.dietary_other = form.cleaned_data['dietary_other'] guest.comments = form.cleaned_data['comments'] guest.save() send_update_mail(changes, user=guest_query.first()) return JsonResponse({ 'redirect': '/thanks' }) else: for form in formset: form.initial = {'guest': Guest.objects.get(id=form['guest'].value())} else: formset = RsvpFormSet(initial=[{ 'guest': g, 'email': g.email, 'attending': g.attending, 'comments': g.comments, 'dietary_requirements': g.dietary_requirements, 'dietary_other': g.dietary_other, } for g in guests]) return JsonResponse({ 'content': render_to_string('rsvp/rsvp_form.html', {'rsvp_formset': formset, 'action': urlresolvers.reverse('rsvp-form')}, request=request) })
def _items(self, cart_status, category=None): ''' Aggregates the items that this user has purchased. Arguments: cart_status (int or Iterable(int)): etc category (Optional[models.inventory.Category]): the category of items to restrict to. Returns: [ProductAndQuantity, ...]: A list of product-quantity pairs, aggregating like products from across multiple invoices. ''' if not isinstance(cart_status, Iterable): cart_status = [cart_status] status_query = ( Q(productitem__cart__status=status) for status in cart_status ) in_cart = Q(productitem__cart__user=self.user) in_cart = in_cart & reduce(operator.__or__, status_query) quantities_in_cart = When( in_cart, then="productitem__quantity", ) quantities_or_zero = Case( quantities_in_cart, default=Value(0), ) products = inventory.Product.objects if category: products = products.filter(category=category) products = products.select_related("category") products = products.annotate(quantity=Sum(quantities_or_zero)) products = products.filter(quantity__gt=0) out = [] for prod in products: out.append(ProductAndQuantity(prod, prod.quantity)) return out
def group_by_cart_status(queryset, order, values): queryset = queryset.annotate( is_reserved=Case( When(cart__in=commerce.Cart.reserved_carts(), then=Value(True)), default=Value(False), output_field=models.BooleanField(), ), ) values = queryset.order_by(*order).values(*values) values = values.annotate( total_paid=Sum(Case( When( cart__status=commerce.Cart.STATUS_PAID, then=F("quantity"), ), default=Value(0), )), total_refunded=Sum(Case( When( cart__status=commerce.Cart.STATUS_RELEASED, then=F("quantity"), ), default=Value(0), )), total_unreserved=Sum(Case( When( ( Q(cart__status=commerce.Cart.STATUS_ACTIVE) & Q(is_reserved=False) ), then=F("quantity"), ), default=Value(0), )), total_reserved=Sum(Case( When( ( Q(cart__status=commerce.Cart.STATUS_ACTIVE) & Q(is_reserved=True) ), then=F("quantity"), ), default=Value(0), )), ) return values
def get_suggested_community_receivers(instance, user_type=USER_TYPE_DEVELOPER, respect_visibility=True): # Filter users based on nature of work queryset = get_user_model().objects.filter( type=user_type ) # Only developers on client's team if respect_visibility and instance.visibility == VISIBILITY_MY_TEAM and user_type == USER_TYPE_DEVELOPER: queryset = queryset.filter( my_connections_q_filter(instance.user) ) ordering = [] # Order by matching skills task_skills = instance.skills.all() if task_skills: when = [] for skill in task_skills: new_when = When( userprofile__skills=skill, then=1 ) when.append(new_when) queryset = queryset.annotate(matches=Sum( Case( *when, default=0, output_field=IntegerField() ) )) ordering.append('-matches') # Order developers by tasks completed if user_type == USER_TYPE_DEVELOPER: queryset = queryset.annotate( tasks_completed=Sum( Case( When( participation__task__closed=True, participation__user__id=F('id'), participation__status=STATUS_ACCEPTED, then=1 ), default=0, output_field=IntegerField() ) ) ) ordering.append('-tasks_completed') if ordering: queryset = queryset.order_by(*ordering) if queryset: return queryset[:15] return