我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用django.db.models.aggregates.Sum()。
def check_expression_support(self, expression): bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField) bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev) if isinstance(expression, bad_aggregates): for expr in expression.get_source_expressions(): try: output_field = expr.output_field if isinstance(output_field, bad_fields): raise NotImplementedError( 'You cannot use Sum, Avg, StdDev, and Variance ' 'aggregations on date/time fields in sqlite3 ' 'since date/time is saved as text.' ) except FieldError: # Not every subexpression has an output_field which is fine # to ignore. pass
def gap(self): if self.race_position == 1: return None leader = Participant.objects.get(session=self.session, race_position=1) if self.session.session_stage.name.startswith("Race") or self.session.finished: if self.total_time: return self.total_time - leader.total_time if leader.current_lap != self.current_lap: return None return None """my_total_time = self.self_or_parent().lap_set.filter(lap__lt=self.current_lap).aggregate(Sum('lap_time'))['lap_time__sum'] leader_total_time = leader.self_or_parent().lap_set.filter(lap__lt=self.current_lap).aggregate(Sum('lap_time'))['lap_time__sum'] if my_total_time is None or leader_total_time is None: return None return my_total_time - leader_total_time""" if not self.fastest_lap_time: return None return self.fastest_lap_time - leader.fastest_lap_time
def data(self): users = User.objects.filter( is_current=True, id__in=Entry.objects .filter(day__gte=self.start, day__lte=self.end) .order_by('user_id') .values_list('user_id', flat=True) .distinct() ) for user in users: data = self.get_query(user=user).annotate(minutes=Sum('minutes')) yield { 'color': 'rgb({}, {}, {})'.format( randint(1, 255), randint(1, 255), randint(1, 255), ), 'data': list(self.extrapolate(data)), 'name': user.first_name+(' '+user.last_name[0]+'.' if user.last_name else ''), }
def range_statistics(start, end): """ Returns the statistics (totals) for a target date. Its month will be used. """ return DayStatistics.objects.filter(day__gte=start, day__lt=end).aggregate( total_cost=Sum('total_cost'), electricity1=Sum('electricity1'), electricity1_cost=Sum('electricity1_cost'), electricity1_returned=Sum('electricity1_returned'), electricity2=Sum('electricity2'), electricity2_cost=Sum('electricity2_cost'), electricity2_returned=Sum('electricity2_returned'), electricity_merged=Sum(models.F('electricity1') + models.F('electricity2')), electricity_cost_merged=Sum(models.F('electricity1_cost') + models.F('electricity2_cost')), electricity_returned_merged=Sum(models.F('electricity1_returned') + models.F('electricity2_returned')), gas=Sum('gas'), gas_cost=Sum('gas_cost'), temperature_min=Min('lowest_temperature'), temperature_max=Max('highest_temperature'), temperature_avg=Avg('average_temperature'), )
def channel_new_messages_annotation(user): """ Queryset needs to annotated with channel_last_read for this to work :param user: :return: """ return Sum( Case( When( ~Q(action_targets__actor_object_id=user.id) & Q(action_targets__gt=F('channel_last_read')) & Q(action_targets__verb__in=[verbs.SEND, verbs.UPLOAD]), then=1 ), default=0, output_field=IntegerField() ) )
def data(self): return self.get_query(user=self.user).annotate(minutes=Sum('minutes'))
def data(self): return self.get_query().annotate( minutes=Sum('minutes'), users=Count('user', distinct=True) )
def get_queryset(self, request): return ( super().get_queryset(request) .annotate(minutes=Coalesce(Sum('entries__minutes'), 0)) .annotate(last_log=Max('entries__updated')) )
def check_expression_support(self, expression): bad_fields = (fields.DateField, fields.DateTimeField, fields.TimeField) bad_aggregates = (aggregates.Sum, aggregates.Avg, aggregates.Variance, aggregates.StdDev) if isinstance(expression, bad_aggregates): try: output_field = expression.input_field.output_field if isinstance(output_field, bad_fields): raise NotImplementedError( 'You cannot use Sum, Avg, StdDev and Variance aggregations ' 'on date/time fields in sqlite3 ' 'since date/time is saved as text.') except FieldError: # not every sub-expression has an output_field which is fine to # ignore pass
def get_changeform_initial_data(self, request): apps = generic.get_app_model_info_from_request(request) obj = getattr(apps,'obj',None) current = request.user if obj: current = obj.user sm = Loan.objects.filter(user=current).aggregate(Sum('loan_amount')).get('loan_amount__sum') or 0.00 return {'loan_amount':sm}
def collection_amount(self): return PaymentCollection.objects.filter(so=self).aggregate(Sum('collection_amount')).get('collection_amount__sum') or 0.00
def invoice_amount(self): total = Invoice.objects.filter(po=self).aggregate(Sum('vo_amount')).get('vo_amount__sum') or 0.00 return total
def payed_amount(self): return Payment.objects.filter(po=self).aggregate(Sum('py_amount')).get('py_amount__sum') or 0.00
def electricity_tariff_percentage(start_date): """ Returns the total electricity consumption percentage by tariff (high/low tariff). """ totals = DayStatistics.objects.filter(day__gte=start_date).aggregate( electricity1=Sum('electricity1'), electricity2=Sum('electricity2'), ) # Empty data will crash. if not all(totals.values()): return None global_total = totals['electricity1'] + totals['electricity2'] totals['electricity1'] = math.ceil(totals['electricity1'] / global_total * 100) totals['electricity2'] = 100 - totals['electricity1'] return totals
def get_initial_manpower(self): return BattleUnit.objects.filter(battle_organization=self).\ aggregate(Sum('starting_manpower'))['starting_manpower__sum']
def get_total_price(cls, year, month): total_price = cls.objects.filter(created_at__year=year, created_at__month=month).aggregate(total=Coalesce(Sum('price'), 0)) return total_price['total']
def timeseries(self, kind=None, **kwargs): """Get GPRS timeseries. Wraps StatsClientBase.aggregate_timeseries with some filtering capabilities. Note that GPRS UEs are all of the type "gprs," but each event of this type contains a count for uploaded and downloaded bytes. Args: kind: the kind of GPRS UsageEvent to query for, valid values are downloaded_data, uploaded_data and the default, total_data. The default will return the sum of downloaded and uploaded. Keyword Args: start_time_epoch, end_time_epoch, interval: are all passed on to StatsClientBase.aggregate_timeseries """ start_time_epoch = kwargs.pop('start_time_epoch', 0) end_time_epoch = kwargs.pop('end_time_epoch', -1) interval = kwargs.pop('interval', 'months') if kind in (None, 'total_data', 'uploaded_data'): uploaded_usage = self.aggregate_timeseries( 'gprs', aggregation='up_byte_count', interval=interval, start_time_epoch=start_time_epoch, end_time_epoch=end_time_epoch) uploaded_usage = self.convert_to_megabytes(uploaded_usage) if kind in (None, 'total_data', 'downloaded_data'): downloaded_usage = self.aggregate_timeseries( 'gprs', aggregation='down_byte_count', interval=interval, start_time_epoch=start_time_epoch, end_time_epoch=end_time_epoch) downloaded_usage = self.convert_to_megabytes(downloaded_usage) if kind == 'uploaded_data': return uploaded_usage elif kind == 'downloaded_data': return downloaded_usage elif kind in (None, 'total_data'): # Sum uploaded and downloaded. up_values = [v[1] for v in uploaded_usage] down_values = [v[1] for v in downloaded_usage] totals = [sum(i) for i in zip(up_values, down_values)] # The dates are all the same for uploaded and downloaded, so we'll # just use the uploaded usage dates. dates = [v[0] for v in uploaded_usage] return zip(dates, totals)
def handle(self, *args, **options): """ Send new message notifications """ # command to run: python manage.py tunga_send_customer_emails min_date = datetime.datetime.utcnow() - relativedelta(minutes=1) # 5 minute window to read new messages customer_channels = Channel.objects.filter( type=CHANNEL_TYPE_SUPPORT, created_by__isnull=True, content_type=ContentType.objects.get_for_model(Inquirer) ).annotate(new_messages=Sum( Case( When( ~Q(action_targets__actor_content_type=F('content_type')) & Q(action_targets__gt=F('last_read')) & Q(action_targets__timestamp__lte=min_date) & Q(action_targets__verb__in=[verbs.SEND, verbs.UPLOAD]), then=1 ), default=0, output_field=IntegerField() ) ), latest_message=Max('action_targets__id')).filter(new_messages__gt=0) for channel in customer_channels: customer = channel.content_object if customer.email: activities = Action.objects.filter( channels=channel, id__gt=channel.last_read, verb__in=[verbs.SEND] ).order_by('id') messages = [activity.action_object for activity in activities] if messages: to = [customer.email] subject = "[Tunga Support] Help" ctx = { 'customer': customer, 'count': channel.new_messages, 'messages': messages, 'channel_url': '%s/customer/help/%s/' % (TUNGA_URL, channel.id) } if send_mail(subject, 'tunga/email/unread_help_messages', to, ctx): channel.last_read = channel.latest_message channel.save()
def handle(self, *args, **options): """ Send new message notifications """ # command to run: python manage.py tunga_send_message_emails utc_now = datetime.datetime.utcnow() min_date = utc_now - relativedelta(minutes=15) # 15 minute window to read new messages min_last_email_date = utc_now - relativedelta(hours=3) # Limit to 1 email every 3 hours per channel commission_date = parse('2016-08-08 00:00:00') # Don't notify about events before the commissioning date user_channels = ChannelUser.objects.filter( Q(last_email_at__isnull=True) | Q(last_email_at__lt=min_last_email_date) ).annotate(new_messages=Sum( Case( When( ~Q(channel__action_targets__actor_object_id=F('user_id')) & Q(channel__action_targets__gt=F('last_read')) & Q(channel__action_targets__timestamp__lte=min_date) & Q(channel__action_targets__timestamp__gte=commission_date) & (Q(last_email_at__isnull=True) | Q(channel__action_targets__timestamp__gt=F('last_email_at'))) & Q(channel__action_targets__verb__in=[verbs.SEND, verbs.UPLOAD]), then=1 ), default=0, output_field=IntegerField() ) )).filter(new_messages__gt=0) for user_channel in user_channels: channel_name = user_channel.channel.get_channel_display_name(user_channel.user) to = [user_channel.user.email] if user_channel.channel.type == CHANNEL_TYPE_DIRECT: conversation_subject = "New message{} from {}".format( user_channel.new_messages == 1 and '' or 's', channel_name ) else: conversation_subject = "Conversation: {}".format(channel_name) subject = conversation_subject ctx = { 'receiver': user_channel.user, 'new_messages': user_channel.new_messages, 'channel_name': channel_name, 'channel': user_channel.channel, 'channel_url': '%s/conversation/%s/' % (TUNGA_URL, user_channel.channel.id) } if send_mail(subject, 'tunga/email/unread_channel_messages', to, ctx): user_channel.last_email_at = datetime.datetime.utcnow() user_channel.save()