我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.models.Value()。
def donations(request): ''' actual data from the FEC, propublica data, through the django ORM cache ''' data = {} data["representatives"] = {d["id"]: d for d in Representative.objects.all() .annotate(name=Concat('first_name', Value(" "), 'last_name')) .values("id", "name", "party")} data["committees"] = {d["id"]: d for d in SuperPAC.objects.all().values("id", "name")} data["donations"] = list(Donation.objects.all() .annotate(source=F("superpac_id"), destination=F("representative_id")) .values("source", "destination", "support") .annotate(amount=Sum("amount"))) return JsonResponse(data)
def form_valid(self, form): if self.__product_pk: product_final = ProductFinal.objects.get(pk=self.__product_pk) self.request.product_final = product_final form.instance.product_final = product_final try: return super(ProductUniqueCreate, self).form_valid(form) except ValidationError as e: errors = form._errors.setdefault("value", ErrorList()) errors.append(e) return super(ProductUniqueCreate, self).form_invalid(form) except IntegrityError: errors = form._errors.setdefault("value", ErrorList()) errors.append(_("Value existing")) return super(ProductUniqueCreate, self).form_invalid(form)
def get_user_languages(self, language_preferences): """ Get the best matching Languages for a list of preferences, in order from best to worst. All languages will appear in the queryset. """ # Scores: Matched languages will get a score depending upon their # preference - better languages will get a higher score, down to 0 for # the 'worst' preferenced language. The default language will get a # score or -1 unless it matched as a preference, and all other # languages will get a score of -2 # `When` clauses for all the preferred languages clauses = [ When(code__iexact=language, then=Value(i)) for i, language in enumerate(reversed(language_preferences))] # The default language gets a score of -1 clauses.append(When(is_default=True, then=Value(-1))) return self.annotate( score=Case( *clauses, default=Value(-2), # Everything else gets -2 output_field=models.IntegerField(null=True)) ).order_by('-score', 'order')
def serve(self, request): language_preferences = get_request_language_preference(request) languages = Language.objects.get_user_languages(language_preferences) candidate_pages = TranslatedPage.objects\ .live().specific()\ .child_of(self)\ .filter(language__in=languages)\ .annotate(language_score=Case( *[When(language=language, then=Value(i)) for i, language in enumerate(languages)], default=Value(None), output_field=models.IntegerField(null=True)))\ .order_by('language_score') translation = candidate_pages.first() if translation: # Redirect to the best translation return redirect(translation.url) else: # No translation was found, not even in the default language! Oh dear. raise Http404
def status_summary(self): """Get interface status summary.""" base_query_set = super(PeeringSessionManager, self).get_queryset() summary = base_query_set.annotate( label=models.Case( models.When(provisioning_state=2, then=models.Case( models.When(admin_state=2, then=models.Case( models.When(operational_state=6, then=models.Value('Up')), default=models.Value('Down') )), default=models.Value('Admin Down') )), models.When(provisioning_state=1, then=models.Value('Provisioning')), default=models.Value('None'), output_field=models.CharField() )).values('label').annotate(value=models.Count('label')) return summary
def viral_video_detail(request, id): yesterday = datetime.date.today() - datetime.timedelta(days=1) qs = ViralVideo.objects.annotate( total_impressions=models.F("desktop_impressions") + models.F("mobile_impressions"), label=models.Case( models.When(total_impressions__gt=POPULAR_FROM, then=models.Value("popular")), models.When(created__gt=yesterday, then=models.Value("new")), default=models.Value("cool"), output_field=models.CharField(), ), ) # DEBUG: check the SQL query that Django ORM generates print(qs.query) qs = qs.filter(pk=id) if request.flavour == "mobile": qs.update(mobile_impressions=models.F("mobile_impressions") + 1) else: qs.update(desktop_impressions=models.F("desktop_impressions") + 1) video = get_object_or_404(qs) return render(request, "viral_videos/viral_video_detail.html", {'video': video})
def get_queryset(self): """ Append the calculated service area based on business logic. (Some bureaus are in service areas not reflected by the data) """ qs = models.BudgetHistory.objects.all() qs = qs.values('fiscal_year', 'service_area_code', 'bureau_code', 'bureau_name').annotate( sa_calced=Case( When(bureau_code__in = LA_Bureaus, then = Value('LA')), When(bureau_code__in = EO_Bureaus, then = Value('EO')), default = 'service_area_code', output_field = CharField() ), amount=Sum('amount')) qs = qs.order_by('fiscal_year', 'service_area_code', 'bureau_code', 'bureau_name') return qs
def get_queryset(self): """ Calculate service area based on business logic. (Some bureaus are in service areas not reflected by the data) """ qs = models.BudgetHistory.objects.all() qs = qs.values('fiscal_year', ).annotate( sa_calced=Case( When(bureau_code__in = LA_Bureaus, then = Value('LA')), When(bureau_code__in = EO_Bureaus, then = Value('EO')), default = 'service_area_code', output_field = CharField() ), amount=Sum('amount'), ) qs = qs.order_by('fiscal_year', 'sa_calced') return qs
def update_list_best_practice(): """ Hits the DB once no matter how many objects. NB: - save() will not be called, and the related signals will not be sent. - does not work with m2m relationships. >>> UPDATE community SET name = (community.name + ' e.V') WHERE community.name LIKE 'community%' """ communities = Community.objects.filter(name__startswith='community') communities.update(name=Concat('name', Value(' e.V'))) ## Bulk delete ##############
def earlier_registration_discount(event_id): try: event = Event.objects.values('start').get(id=event_id) month_ago = event['start'] - timedelta(weeks=4) three_weeks_ago = event['start'] - timedelta(weeks=3) two_weeks_ago = event['start'] - timedelta(weeks=2) Registration.objects.filter(event_id=event_id).update( discount=Case( When(Q(registered_on__lte=month_ago), then=Value(15)), When(Q(registered_on__lte=three_weeks_ago), then=Value(10)), When(Q(registered_on__lte=two_weeks_ago), then=Value(5)), default=Value(0) )) # >>> UPDATE registration SET discount = CASE # WHEN registration.registered_on <= '2016-07-15 18:00:00' THEN 15 # WHEN registration.registered_on <= '2016-07-22 18:00:00' THEN 10 # WHEN registration.registered_on <= '2016-07-29 18:00:00' THEN 5 # ELSE 0 # END WHERE registration.event_id = 3 except Event.DoesNotExist as e: print("Insert valid event ID")
def __init__(self, expression, pos, length=None, **extra): """ expression: the name of a field, or an expression returning a string pos: an integer > 0, or an expression returning an integer length: an optional number of characters to return """ if not hasattr(pos, 'resolve_expression'): if pos < 1: raise ValueError("'pos' must be greater than 0") pos = Value(pos) expressions = [expression, pos] if length is not None: if not hasattr(length, 'resolve_expression'): length = Value(length) expressions.append(length) super(Substr, self).__init__(*expressions, **extra)
def get_queryset(self): empty_str = ExpressionWrapper(V(''), output_field=CharField()) future_meeting = models.MeetingHistory.objects.latest('date') return models.PresentHistory.objects.values( date=F('meeting__date'), presentation_type=F('present_type'), presenter_name=F('presenter__name'), present_content=F('content'), ).exclude(meeting__date=future_meeting.date).order_by().union( models.MeetingSkip.objects.all().values( 'date', presentation_type=Concat(V('Postponed: '), 'reason'), presenter_name=empty_str, present_content=empty_str, ).filter(date__lte=date.today()).order_by() ).order_by('-date')
def list_employees_search(request,name): """ parameters/returns: employees: lista de objetos employee a los que tiene acceso el administrador (los que están en su empresa) template: employee_list.html """ # Check that the current user has permissions lista = get_list_for_role(request) if name != "all_true": #little opt just if not empty lista=lista.annotate( search_name=Concat('user__first_name', V(' '), 'user__last_name') ).filter(search_name__icontains=name) employees = lista.filter(user__is_active=True) return render(request, "employee/employee_search.html", {"employees": employees})
def reorder(_default=AFTER, _reverse=False, **kwargs): """ Return a database expression that can be used in an order_by() so that the queryset will be sorted according to the order of values given. """ if not 0 < len(kwargs) <= 1: raise TypeError("reorder() takes one non-optional keyword argument") fieldname, new_order = kwargs.popitem() if _default is BEFORE: _default = -1 elif _default is AFTER: _default = len(new_order) whens = [When(**{fieldname: v, 'then': i}) for i, v in enumerate(new_order)] casewhen = Case(*whens, default=Value(_default), output_field=IntegerField()) if _reverse: return casewhen.desc() else: return casewhen.asc()
def pre_filter(self, queryset, user): ''' Returns all of the items from queryset where the date falls into any specified range, but not yet where the stock limit is not yet reached.''' now = timezone.now() # Keep items with no start time, or start time not yet met. queryset = queryset.filter(Q(start_time=None) | Q(start_time__lte=now)) queryset = queryset.filter(Q(end_time=None) | Q(end_time__gte=now)) # Filter out items that have been reserved beyond the limits quantity_or_zero = self._calculate_quantities(user) remainder = Case( When(limit=None, then=Value(_BIG_QUANTITY)), default=F("limit") - Sum(quantity_or_zero), ) queryset = queryset.annotate(remainder=remainder) queryset = queryset.filter(remainder__gt=0) return queryset
def credit_notes(request, form): ''' Shows all of the credit notes in the system. ''' notes = commerce.CreditNote.objects.all().select_related( "creditnoterefund", "creditnoteapplication", "invoice", "invoice__user__attendee__attendeeprofilebase", ) return QuerysetReport( "Credit Notes", ["id", "invoice__user__attendee__attendeeprofilebase__invoice_recipient", "status", "value"], notes, headings=["id", "Owner", "Status", "Value"], link_view=views.credit_note, )
def search(request): ''' Searches through all available sources to match the user's query ''' query = request.GET["query"] ## match the query against a zipcode regex, go a zipcode search if it matches if re.match("^\d{5}$", query): ## here we call an external api to search for the reps via zipcode results = SunlightAPI().rep_by_zip(query) ## loop through the results reps = [] for rep in results["results"]: ## try finding the rep in our database reps += (Representative.objects.all() .annotate(name=Concat('first_name', Value(" "), 'last_name')) .filter(name=rep["first_name"]+" "+rep["last_name"]) .values("id", "name", "party")) ## return the found reps return JsonResponse({"representatives": reps, "committees": []}) data = {} data["representatives"] = list(Representative.objects.all() .annotate(name=Concat('first_name', Value(" "), 'last_name')) .filter(name__icontains=query) .values("id", "name", "party")) data["committees"] = list(SuperPAC.objects.filter(name__icontains=query) .values("id", "name")) if "." in query or len(query) > 5: results = SunlightAPI().search(query) data["bills"] = results["results"] return JsonResponse(data)
def form_valid(self, form): attribute = form.cleaned_data['attribute'] if attribute.type_value == TYPE_VALUE_BOOLEAN: value = int(form.cleaned_data['value_bool']) elif attribute.type_value == TYPE_VALUE_FREE: value = form.cleaned_data['value_free'] elif attribute.type_value == TYPE_VALUE_LIST: value = form.cleaned_data['value_list'] if not OptionValueAttribute.objects.filter( pk=value, group__attributes=attribute ).exists(): errors = form._errors.setdefault("value", ErrorList()) errors.append(_('Option invalid')) return super(ProductFeatureCreate, self).form_invalid(form) else: value = None if value is None: errors = form._errors.setdefault("value", ErrorList()) errors.append(_('Value invalid')) return super(ProductFeatureCreate, self).form_invalid(form) self.request.value = value form.instance.value = value if self.__product_pk: product = ProductFinal.objects.get(pk=self.__product_pk) self.request.product = product form.instance.product = product return super(ProductFinalAttributeCreate, self).form_valid(form)
def form_valid(self, form): attribute = form.cleaned_data['attribute'] if attribute.type_value == TYPE_VALUE_BOOLEAN: value = int(form.cleaned_data['value_bool']) elif attribute.type_value == TYPE_VALUE_FREE: value = form.cleaned_data['value_free'] elif attribute.type_value == TYPE_VALUE_LIST: value = form.cleaned_data['value_list'] if not OptionValueAttribute.objects.filter( pk=value, group__attributes=attribute ).exists(): errors = form._errors.setdefault("value", ErrorList()) errors.append(_('Option invalid')) return super(ProductFinalAttributeUpdate, self).form_invalid(form) else: value = None if value is None: errors = form._errors.setdefault("value", ErrorList()) errors.append(_('Value invalid')) return super(ProductFinalAttributeUpdate, self).form_invalid(form) self.request.value = value form.instance.value = value return super(ProductFinalAttributeUpdate, self).form_valid(form)
def form_valid(self, form): feature = form.cleaned_data['feature'] if feature.type_value == TYPE_VALUE_BOOLEAN: value = int(form.cleaned_data['value_bool']) elif feature.type_value == TYPE_VALUE_FREE: value = form.cleaned_data['value_free'] elif feature.type_value == TYPE_VALUE_LIST: value = form.cleaned_data['value_list'] if not OptionValueFeature.objects.filter( pk=value, group__features=feature ).exists(): errors = form._errors.setdefault("value", ErrorList()) errors.append(_('Option invalid')) return super(ProductFeatureCreate, self).form_invalid(form) else: value = None if value is None: errors = form._errors.setdefault("value", ErrorList()) errors.append(_('Value invalid')) return super(ProductFeatureCreate, self).form_invalid(form) self.request.value = value form.instance.value = value if self.__product_pk: product = Product.objects.get(pk=self.__product_pk) self.request.product = product form.instance.product = product return super(ProductFeatureCreate, self).form_valid(form)
def form_valid(self, form): feature = form.cleaned_data['feature'] if feature.type_value == TYPE_VALUE_BOOLEAN: value = int(form.cleaned_data['value_bool']) elif feature.type_value == TYPE_VALUE_FREE: value = form.cleaned_data['value_free'] elif feature.type_value == TYPE_VALUE_LIST: value = form.cleaned_data['value_list'] if not OptionValueFeature.objects.filter( pk=value, group__features=feature ).exists(): errors = form._errors.setdefault("value", ErrorList()) errors.append(_('Option invalid')) return super(ProductFeatureUpdate, self).form_invalid(form) else: value = None if value is None: errors = form._errors.setdefault("value", ErrorList()) errors.append(_('Value invalid')) return super(ProductFeatureUpdate, self).form_invalid(form) self.request.value = value form.instance.value = value return super(ProductFeatureUpdate, self).form_valid(form)
def add_items_update_then_create(self, content_type_pk, objs, config): ids_and_objs = {} for obj in objs: obj._search_vector = ( ADD([ SearchVector(Value(text), weight=weight, config=config) for text, weight in obj._body_]) if obj._body_ else SearchVector(Value(''))) ids_and_objs[obj._object_id] = obj index_entries = IndexEntry._default_manager.using(self.db_alias) index_entries_for_ct = index_entries.filter( content_type_id=content_type_pk) indexed_ids = frozenset( index_entries_for_ct.filter(object_id__in=ids_and_objs) .values_list('object_id', flat=True)) for indexed_id in indexed_ids: obj = ids_and_objs[indexed_id] index_entries_for_ct.filter(object_id=obj._object_id) \ .update(body_search=obj._search_vector) to_be_created = [] for object_id in ids_and_objs: if object_id not in indexed_ids: to_be_created.append(IndexEntry( content_type_id=content_type_pk, object_id=object_id, body_search=ids_and_objs[object_id]._search_vector, )) index_entries.bulk_create(to_be_created)
def test_annotate_coalesce(self): qs = Blog.objects.annotate( e=models.functions.Coalesce('title_nl', models.Value('EMPTY')) ) self.assertEquals( list(qs.values_list('e', flat=True)), ['Valk', 'EMPTY', 'EMPTY', 'EMPTY', 'Zebra'] )
def average_for_views(self): case = Case(When(satisfied=True, then=Value(1)), When(satisfied=False, then=Value(0)), output_field=IntegerField()) return self.values('view_name').annotate(average=Avg(case), count=Count('view_name')).order_by('view_name')
def get_queryset(self): queryset = Report.objects.all() queryset = queryset.annotate( year=ExtractYear('date'), month=ExtractMonth('date') ) queryset = queryset.values('year', 'month') queryset = queryset.annotate(duration=Sum('duration')) queryset = queryset.annotate(pk=Concat('year', Value('-'), 'month')) return queryset
def get_queryset(self): date = self._extract_date() user = self.request.user queryset = get_user_model().objects.values('id') queryset = queryset.annotate( date=Value(date, DateField()), ) # last_reported_date filter is set, a date can only be calucated # for users with either at least one absence or report if date is None: users_with_reports = Report.objects.values('user').distinct() users_with_absences = Absence.objects.values('user').distinct() active_users = users_with_reports.union(users_with_absences) queryset = queryset.filter(id__in=active_users) queryset = queryset.annotate( pk=Concat( 'id', Value('_'), 'date', output_field=CharField() ) ) if not user.is_superuser: queryset = queryset.filter( Q(id=user.id) | Q(supervisors=user) ) return queryset
def get_queryset(self): date = self._extract_date() user = self._extract_user() queryset = models.AbsenceType.objects.values('id') queryset = queryset.annotate( date=Value(date, DateField()), ) queryset = queryset.annotate( user=Value(user.id, IntegerField()), ) queryset = queryset.annotate( pk=Concat( 'user', Value('_'), 'id', Value('_'), 'date', output_field=CharField() ) ) # only myself, superuser and supervisors may see by absence balances current_user = self.request.user if not current_user.is_superuser: if current_user.id != user.id: if not current_user.supervisees.filter(id=user.id).exists(): return models.AbsenceType.objects.none() return queryset
def filter_date(self, queryset, name, value): queryset = queryset.annotate( end=Coalesce('end_date', Value(date.today())) ) queryset = queryset.filter( start_date__lte=value, end__gte=value ) return queryset
def validate(self, data): """Validate the employment as a whole. Ensure the end date is after the start date and there is only one active employment per user and there are no overlapping employments. :throws: django.core.exceptions.ValidationError :return: validated data :rtype: dict """ instance = self.instance start_date = data.get('start_date', instance and instance.start_date) end_date = data.get('end_date', instance and instance.end_date) if end_date and start_date >= end_date: raise ValidationError(_( 'The end date must be after the start date' )) user = data.get('user', instance and instance.user) employments = models.Employment.objects.filter(user=user) # end date not set means employment is ending today end_date = end_date or date.today() employments = employments.annotate( end=Coalesce('end_date', Value(date.today())) ) if instance: employments = employments.exclude(id=instance.id) if any([ e.start_date <= end_date and start_date <= e.end for e in employments ]): raise ValidationError(_( 'A user can\'t have multiple employments at the same time' )) return data
def main_country_priority(queryset, country_field='country'): return queryset.annotate(main_country=Case( When(**{country_field: Country.objects.get(code=MAIN_COUNTRY), 'then': Value('1')}), default=Value('0'), output_field=IntegerField() )).order_by('-main_country', 'name')
def __init__(self, *args, **kwargs): super(SelectTowerForm, self).__init__(*args, **kwargs) # We create the choice field here in the init so that if the network # values change, the form will pick up the changes and not require the # server to be restarted. choices = [] # We create a convoluted tower queryset so that towers that have never # synced (last_active = None) sort after active and inactive towers. the_past = datetime.datetime.now() - datetime.timedelta(days=10*365) all_towers = models.BTS.objects.all().annotate( new_last_active=Coalesce('last_active', Value(the_past))).order_by( '-new_last_active') for tower in all_towers: value = tower.id user_profile = models.UserProfile.objects.get( network=tower.network) abbreviated_uuid = tower.uuid[0:5] if tower.nickname: prefix = 'Tower "%s" - %s..' % ( tower.nickname, abbreviated_uuid) else: prefix = 'Tower %s..' % abbreviated_uuid display = '%s (%s)' % (prefix, user_profile.user.email) choices.append((value, display)) self.fields['tower'] = forms.ChoiceField( label="Tower", choices=choices, required=False) # Set layout attributes. self.helper = FormHelper() self.helper.form_id = 'select-tower-form' self.helper.form_method = 'post' self.helper.form_action = '/dashboard/staff/tower-monitoring' self.helper.add_input(Submit('submit', 'Select')) self.helper.layout = Layout('tower')
def get(self, request): """"Handles GET requests.""" user_profile = models.UserProfile.objects.get(user=request.user) if not user_profile.user.is_staff: return response.Response('', status=status.HTTP_404_NOT_FOUND) # We create a convoluted queryset so that towers that have never synced # (last_active = None) sort after active and inactive towers. the_past = datetime.datetime.now() - datetime.timedelta(days=10*365) towers = models.BTS.objects.all().annotate( new_last_active=Coalesce('last_active', Value(the_past))).order_by( '-new_last_active') # Attach UserProfiles to each tower in the queryset. for tower in towers: tower_user_profiles = models.UserProfile.objects.filter( network=tower.network) for tower_user_profile in tower_user_profiles: if hasattr(tower, 'user_email'): tower.user_email += ',' + tower_user_profile.user.email else: tower.user_email = tower_user_profile.user.email # Configure the table of towers. tower_table = django_tables.StaffTowerTable(list(towers)) towers_per_page = 8 paginate = False if towers.count() > towers_per_page: paginate = {'per_page': towers_per_page} tables.RequestConfig(request, paginate=paginate).configure( tower_table) context = { 'networks': get_objects_for_user(request.user, 'view_network', klass=models.Network), 'user_profile': user_profile, 'towers': towers, 'tower_table': tower_table, } # Render the template. towers_template = template.loader.get_template( 'dashboard/staff/towers.html') html = towers_template.render(context, request) return http.HttpResponse(html)
def annotate_bookmark(queryset, request=None): """ Annotates a queryset with information about wether an object was bookmarked or not. Used on search viewsets. """ if request and request.user.is_authenticated(): return queryset.annotate(is_bookmarked=Count(Case( When(bookmarks__user=request.user, then=True), output_field=BooleanField() )) ) else: return queryset.annotate(is_bookmarked=Value(False, BooleanField()))
def get_context_data(self, abbr, season, episode, **kwargs): ctx = super().get_context_data() # Get show by abbr show = get_object_or_404(Show, abbr=abbr) episode = get_object_or_404(Episode, show=show,season__number=season,episode=episode) # I acknowledge that this is a mess. A functional mess. But a mess nonetheless. Hey, that rhymed! submissions = episode.submissions.annotate( positives=Count( Case( When( votes__positive=True, then=Value(1) ) ) ), negatives=Count('votes') - F('positives'), score=F('positives') - F('negatives') ).order_by('-score') # Add fields to context ctx['show'] = show ctx['episode'] = episode ctx['submissions'] = submissions return ctx # Submission form GET and POST
def coalesce(self): # null on either side results in null for expression, wrap with coalesce c = self.copy() expressions = [ Coalesce(expression, Value('')) for expression in c.get_source_expressions() ] c.set_source_expressions(expressions) return c
def toggle_active(self, request, queryset): """ Inverts the ``is_active`` flag of chosen access rules. """ queryset.update( is_active=models.Case( models.When(is_active=True, then=models.Value(False)), default=models.Value(True))) self.message_user(request, _('Activated {0} and deactivated {1} ' 'access rules.'.format(queryset.filter(is_active=True).count(), queryset.filter(is_active=False).count())))
def get_queryset(self): return super().get_queryset() \ .annotate(ticket_count=models.Sum( models.Case( models.When( orderitem__order__refunded=True, then=0 ), models.When( orderitem__order__billed_total='', then=0 ), models.When( orderitem__order__billed_total__isnull=True, then=0 ), default=1, output_field=models.IntegerField(), ))) \ .annotate(sold_out=models.Case( models.When( ticket_count__lt=F('capacity'), then=models.Value(False), ), default=True, output_field=models.BooleanField(), ))
def update_output(cls, execution_id, stdout, stderr): """ Append to stdout & stderr. Use concatenation to efficiently update the fields. """ query = Execution.objects.filter(id=execution_id) query.update( stdout=functions.Concat('stdout', models.Value(stdout or '')), stderr=functions.Concat('stderr', models.Value(stderr or '')) )
def user_remainders(cls, user): ''' Return: Mapping[int->int]: A dictionary that maps the category ID to the user's remainder for that category. ''' categories = inventory.Category.objects.all() cart_filter = ( Q(product__productitem__cart__user=user) & Q(product__productitem__cart__status=commerce.Cart.STATUS_PAID) ) quantity = When( cart_filter, then='product__productitem__quantity' ) quantity_or_zero = Case( quantity, default=Value(0), ) remainder = Case( When(limit_per_user=None, then=Value(99999999)), default=F('limit_per_user') - Sum(quantity_or_zero), ) categories = categories.annotate(remainder=remainder) return dict((cat.id, cat.remainder) for cat in categories)
def _calculate_quantities(cls, user): reserved_carts = cls._relevant_carts(user) # Calculate category lines item_cats = F('categories__product__productitem__product__category') reserved_category_products = ( Q(categories=item_cats) & Q(categories__product__productitem__cart__in=reserved_carts) ) # Calculate product lines reserved_products = ( Q(products=F('products__productitem__product')) & Q(products__productitem__cart__in=reserved_carts) ) category_quantity_in_reserved_carts = When( reserved_category_products, then="categories__product__productitem__quantity", ) product_quantity_in_reserved_carts = When( reserved_products, then="products__productitem__quantity", ) quantity_or_zero = Case( category_quantity_in_reserved_carts, product_quantity_in_reserved_carts, default=Value(0), ) return quantity_or_zero
def _calculate_quantities(cls, user): reserved_carts = cls._relevant_carts(user) quantity_in_reserved_carts = When( discountitem__cart__in=reserved_carts, then="discountitem__quantity" ) quantity_or_zero = Case( quantity_in_reserved_carts, default=Value(0) ) return quantity_or_zero
def user_remainders(cls, user): ''' Return: Mapping[int->int]: A dictionary that maps the product ID to the user's remainder for that product. ''' products = inventory.Product.objects.all() cart_filter = ( Q(productitem__cart__user=user) & Q(productitem__cart__status=commerce.Cart.STATUS_PAID) ) quantity = When( cart_filter, then='productitem__quantity' ) quantity_or_zero = Case( quantity, default=Value(0), ) remainder = Case( When(limit_per_user=None, then=Value(99999999)), default=F('limit_per_user') - Sum(quantity_or_zero), ) products = products.annotate(remainder=remainder) return dict((product.id, product.remainder) for product in products)
def invoices(request, form): ''' Shows all of the invoices in the system. ''' invoices = commerce.Invoice.objects.all().order_by("status", "id") return QuerysetReport( "Invoices", ["id", "recipient", "value", "get_status_display"], invoices, headings=["id", "Recipient", "Value", "Status"], link_view=views.invoice, )
def speaker_registrations(request, form): ''' Shows registration status for speakers with a given proposal kind. ''' kinds = form.cleaned_data["kind"] presentations = schedule_models.Presentation.objects.filter( proposal_base__kind__in=kinds, ).exclude( cancelled=True, ) users = User.objects.filter( Q(speaker_profile__presentations__in=presentations) | Q(speaker_profile__copresentations__in=presentations) ) paid_carts = commerce.Cart.objects.filter(status=commerce.Cart.STATUS_PAID) paid_carts = Case( When(cart__in=paid_carts, then=Value(1)), default=Value(0), output_field=models.IntegerField(), ) users = users.annotate(paid_carts=Sum(paid_carts)) users = users.order_by("paid_carts") return QuerysetReport( "Speaker Registration Status", ["id", "speaker_profile__name", "email", "paid_carts"], users, link_view=attendee, ) return []
def get_queryset(self): """Get queryset with custom annotations.""" base_query_set = super(PeeringSessionManager, self).get_queryset() query_set = base_query_set.annotate( session_state=models.Case( models.When(provisioning_state=2, then=models.Case( models.When(admin_state=2, then=models.Case( models.When(operational_state=6, then=models.Value('Up')), default=models.Value('Down') )), default=models.Value('Admin Down') )), models.When(provisioning_state=1, then=models.Value('Provisioning')), default=models.Value('None'), output_field=models.CharField() ), local_address=models.Case( models.When(af=1, then=models.F('prngrtriface__netixlan__ipaddr4')), models.When(af=2, then=models.F('prngrtriface__netixlan__ipaddr6')), default=None, output_field=IPAddressField() ), remote_address=models.Case( models.When(af=1, then=models.F('peer_netixlan__ipaddr4')), models.When(af=2, then=models.F('peer_netixlan__ipaddr6')), default=None, output_field=IPAddressField() ), address_family=models.Case( models.When(af=1, then=models.Value('IPv4')), models.When(af=2, then=models.Value('IPv6')), default=models.Value('Unknown'), output_field=models.CharField() ), ixp_name=models.F('prngrtriface__netixlan__ixlan__ix__name'), router_hostname=models.F('prngrtriface__prngrtr__hostname'), remote_network_name=models.F('peer_netixlan__net__name'), remote_network_asn=models.F('peer_netixlan__net__asn') ) return query_set
def setup_queryset(self, *args, **kwargs): """ The queryset is annotated so that it can be sorted by number of errors and number of warnings; but note that the criteria for finding the log messages to populate these fields should match those used in the Build model (orm/models.py) to populate the errors and warnings properties """ queryset = self.get_builds() # Don't include in progress builds pr cancelled builds queryset = queryset.exclude(Q(outcome=Build.IN_PROGRESS) | Q(outcome=Build.CANCELLED)) # sort queryset = queryset.order_by(self.default_orderby) # annotate with number of ERROR, EXCEPTION and CRITICAL log messages criteria = (Q(logmessage__level=LogMessage.ERROR) | Q(logmessage__level=LogMessage.EXCEPTION) | Q(logmessage__level=LogMessage.CRITICAL)) queryset = queryset.annotate( errors_no=Count( Case( When(criteria, then=Value(1)), output_field=IntegerField() ) ) ) # annotate with number of WARNING log messages queryset = queryset.annotate( warnings_no=Count( Case( When(logmessage__level=LogMessage.WARNING, then=Value(1)), output_field=IntegerField() ) ) ) self.queryset = queryset