我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用django.db.models.functions.Coalesce()。
def as_sql(self, bare_lookup, fallback=True): ''' Compose the sql lookup to get the value for this virtual field in a query. ''' language = self.get_language() if language == DEFAULT_LANGUAGE: return self._localized_lookup(language, bare_lookup) if not fallback: i18n_lookup = self._localized_lookup(language, bare_lookup) return Cast(i18n_lookup, self.output_field()) fallback_chain = get_fallback_chain(language) # first, add the current language to the list of lookups lookups = [self._localized_lookup(language, bare_lookup)] # and now, add the list of fallback languages to the lookup list for fallback_language in fallback_chain: lookups.append( self._localized_lookup(fallback_language, bare_lookup) ) return Coalesce(*lookups, output_field=self.output_field())
def for_user(self, user, start, end): """Get employments in given time frame for current user. This includes overlapping employments. :param User user: The user of the searched employments :param datetime.date start: start of time frame :param datetime.date end: end of time frame :returns: queryset of employments """ # end date NULL on database is like employment is ending today queryset = self.annotate( end=functions.Coalesce('end_date', models.Value(date.today())) ) return queryset.filter( user=user ).exclude( models.Q(end__lt=start) | models.Q(start_date__gt=end) )
def count_votes(self): """ Returns questions annotated with the number of votes they have. """ return self.annotate(num_votes=Coalesce(models.Sum('votes__vote'), 0))
def count_votes(self): """ Returns questions annotated with the number of votes they have. """ return self.annotate(num_votes=Coalesce(models.Sum('votes__vote'), 0)) ########################################################################## ## Answer Manager ##########################################################################
def output_field(self): ''' The type of field used to Cast/Coalesce to. Mainly because a max_length argument is required for CharField until this PR is merged: https://github.com/django/django/pull/8758 ''' Field = self.original_field.__class__ if isinstance(self.original_field, fields.CharField): return Field(max_length=self.original_field.max_length) return Field()
def filter_date(self, queryset, name, value): queryset = queryset.annotate( end=Coalesce('end_date', Value(date.today())) ) queryset = queryset.filter( start_date__lte=value, end__gte=value ) return queryset
def validate(self, data): """Validate the employment as a whole. Ensure the end date is after the start date and there is only one active employment per user and there are no overlapping employments. :throws: django.core.exceptions.ValidationError :return: validated data :rtype: dict """ instance = self.instance start_date = data.get('start_date', instance and instance.start_date) end_date = data.get('end_date', instance and instance.end_date) if end_date and start_date >= end_date: raise ValidationError(_( 'The end date must be after the start date' )) user = data.get('user', instance and instance.user) employments = models.Employment.objects.filter(user=user) # end date not set means employment is ending today end_date = end_date or date.today() employments = employments.annotate( end=Coalesce('end_date', Value(date.today())) ) if instance: employments = employments.exclude(id=instance.id) if any([ e.start_date <= end_date and start_date <= e.end for e in employments ]): raise ValidationError(_( 'A user can\'t have multiple employments at the same time' )) return data
def get_queryset(self): qs = super().get_queryset() qs = qs.annotate(score=Coalesce(Avg('vote__vote'), 0)) return qs
def query_sum(queryset, field): return queryset.aggregate(s=Coalesce(Sum(field), 0))['s']
def __init__(self, *args, **kwargs): super(SelectTowerForm, self).__init__(*args, **kwargs) # We create the choice field here in the init so that if the network # values change, the form will pick up the changes and not require the # server to be restarted. choices = [] # We create a convoluted tower queryset so that towers that have never # synced (last_active = None) sort after active and inactive towers. the_past = datetime.datetime.now() - datetime.timedelta(days=10*365) all_towers = models.BTS.objects.all().annotate( new_last_active=Coalesce('last_active', Value(the_past))).order_by( '-new_last_active') for tower in all_towers: value = tower.id user_profile = models.UserProfile.objects.get( network=tower.network) abbreviated_uuid = tower.uuid[0:5] if tower.nickname: prefix = 'Tower "%s" - %s..' % ( tower.nickname, abbreviated_uuid) else: prefix = 'Tower %s..' % abbreviated_uuid display = '%s (%s)' % (prefix, user_profile.user.email) choices.append((value, display)) self.fields['tower'] = forms.ChoiceField( label="Tower", choices=choices, required=False) # Set layout attributes. self.helper = FormHelper() self.helper.form_id = 'select-tower-form' self.helper.form_method = 'post' self.helper.form_action = '/dashboard/staff/tower-monitoring' self.helper.add_input(Submit('submit', 'Select')) self.helper.layout = Layout('tower')
def get(self, request): """"Handles GET requests.""" user_profile = models.UserProfile.objects.get(user=request.user) if not user_profile.user.is_staff: return response.Response('', status=status.HTTP_404_NOT_FOUND) # We create a convoluted queryset so that towers that have never synced # (last_active = None) sort after active and inactive towers. the_past = datetime.datetime.now() - datetime.timedelta(days=10*365) towers = models.BTS.objects.all().annotate( new_last_active=Coalesce('last_active', Value(the_past))).order_by( '-new_last_active') # Attach UserProfiles to each tower in the queryset. for tower in towers: tower_user_profiles = models.UserProfile.objects.filter( network=tower.network) for tower_user_profile in tower_user_profiles: if hasattr(tower, 'user_email'): tower.user_email += ',' + tower_user_profile.user.email else: tower.user_email = tower_user_profile.user.email # Configure the table of towers. tower_table = django_tables.StaffTowerTable(list(towers)) towers_per_page = 8 paginate = False if towers.count() > towers_per_page: paginate = {'per_page': towers_per_page} tables.RequestConfig(request, paginate=paginate).configure( tower_table) context = { 'networks': get_objects_for_user(request.user, 'view_network', klass=models.Network), 'user_profile': user_profile, 'towers': towers, 'tower_table': tower_table, } # Render the template. towers_template = template.loader.get_template( 'dashboard/staff/towers.html') html = towers_template.render(context, request) return http.HttpResponse(html)
def calculate_products_cost(self): """ ??????? ????????? ??????? """ return self.records.aggregate( cost=Coalesce(models.Sum( models.F('order_price') * models.F('count'), output_field=ValuteField() ), 0) )['cost']
def get_rating(): return { 'count': RatingVote.objects.count(), 'avg': RatingVote.objects.aggregate(rating=Coalesce(models.Avg('rating'), 0))['rating'] }
def save(self, *args, **kwargs): is_add = not self.pk if is_add: self.self_type = ContentType.objects.get_for_model(type(self), for_concrete_model=False) self.created = now() self.sort_order = self.gallery.items.aggregate( max=Coalesce(models.Max('sort_order'), 0) + 1 )['max'] super().save(*args, **kwargs)
def get_queryset(self, request): return ( super().get_queryset(request) .annotate(minutes=Coalesce(Sum('entries__minutes'), 0)) .annotate(last_log=Max('entries__updated')) )
def get_total_price(cls, year, month): total_price = cls.objects.filter(created_at__year=year, created_at__month=month).aggregate(total=Coalesce(Sum('price'), 0)) return total_price['total']