我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用django.db.models.aggregates.Max()。
def save(self, *args, **kwargs): if self.parent and self.share_of: raise ValueError("Can't be both a reply and a share!") self.cache_data() if self.parent: self.content_type = ContentType.REPLY # Ensure replies have sane values self.visibility = self.parent.visibility self.pinned = False elif self.share_of: self.content_type = ContentType.SHARE if not self.pk: if not self.guid: self.guid = uuid4() if self.pinned: max_order = Content.objects.top_level().filter(author=self.author).aggregate(Max("order"))["order__max"] if max_order is not None: # If max_order is None, there is likely to be no content yet self.order = max_order + 1 self.fix_local_uploads() super().save(*args, **kwargs) self.cache_related_object_data()
def search_shops_on_forum(force=False): # Get member pages step = 500 last_page = page_number = (Member.objects.aggregate(Max('page_number')) and not force) or 1 page_url = 'http://www.prestashop.com/forums/members/page__sort_key__members_display_name__sort_order__asc__max_results__%d__st__%d' % (step, (last_page-1)*step) while page_url: page = document_fromstring(urllib2.urlopen(page_url).read()) for member in page.cssselect('ul.members li h3.bar a:first'): # member url Member.objects.get_or_create(link=member.get('href'), defaults={'page_number':page_number}) page_url = page.cssselect('ul.pagination.left li.next a').get('href') page_number+=1 for member in Member.objects.filter(page_number__gte=last_page): member_page = document_fromstring(urllib2.urlopen(member.link).read()) for link in member_page.cssselect('div.general_box div.signature a'): ShopLink.objects.get_or_create(link=link.get('href'), member=member)
def search_shops_on_rus_forum(force=False): last_page = (MemberRus.objects.aggregate(Max('page_number')) and not force) or 1 for i in range(last_page, 4219): page_url = 'http://prestadev.ru/forum/profile.php?u='+str(i) page = document_fromstring(urllib2.urlopen(page_url).read()) messages = 0 try: messages = int(page.cssselect('div.wttborder td strong')[2].text.strip()) except: pass try: params = {'title': page.cssselect('#profilename')[0].text.strip(), 'messages': messages, 'page_number': i, 'home_page':page.cssselect('div.wttborder td.row1')[4]} except IndexError: continue member = MemberRus.objects.get_or_create(**params)[0] for link in page.cssselect('div.wgborder td.row1 a'): ShopLinkRus.objects.get_or_create(link=link.get('href'), member=member)
def range_statistics(start, end): """ Returns the statistics (totals) for a target date. Its month will be used. """ return DayStatistics.objects.filter(day__gte=start, day__lt=end).aggregate( total_cost=Sum('total_cost'), electricity1=Sum('electricity1'), electricity1_cost=Sum('electricity1_cost'), electricity1_returned=Sum('electricity1_returned'), electricity2=Sum('electricity2'), electricity2_cost=Sum('electricity2_cost'), electricity2_returned=Sum('electricity2_returned'), electricity_merged=Sum(models.F('electricity1') + models.F('electricity2')), electricity_cost_merged=Sum(models.F('electricity1_cost') + models.F('electricity2_cost')), electricity_returned_merged=Sum(models.F('electricity1_returned') + models.F('electricity2_returned')), gas=Sum('gas'), gas_cost=Sum('gas_cost'), temperature_min=Min('lowest_temperature'), temperature_max=Max('highest_temperature'), temperature_avg=Avg('average_temperature'), )
def annotate_channel_queryset_with_latest_activity_at(queryset, user): return queryset.annotate( latest_activity_timestamp=Max('action_targets__timestamp'), ).annotate( latest_activity_at=Case( When( latest_activity_timestamp__isnull=True, then='created_at' ), When( latest_activity_timestamp__gt=F('created_at'), then='latest_activity_timestamp' ), default='created_at', output_field=DateTimeField() ) )
def max_pk(): """Returns the sum of all the highest PKs for each submodel.""" return reduce( lambda x, y: x + y, [int(p.objects.aggregate(Max('pk')).values()[0] or 0) for p in AbstractPage.__subclasses__()], )
def get(self, request): if request.user.is_authenticated(): histories = OnlineHistory.objects.filter(mac=request.user.username)\ .values('date').annotate(min_time=Min('time'), max_time=Max('time')).order_by("-date") return render(request, "index.html", locals()) else: verification_token = str(uuid.uuid4()) request.session["verification_token"] = verification_token return render(request, "authentication.html", locals())
def get(self, request): if request.user.has_perm("mobile_scanner.view_staffonlinehistory"): histories = OnlineHistory.objects.filter()\ .values('date', 'user__last_name', 'user__first_name').annotate(min_time=Min('time'), max_time=Max('time')).order_by("-date") return render(request, "staff.html", locals()) else: msg = "?????" return render(request, "msg.html", locals())
def get_queryset(self, request): return ( super().get_queryset(request) .annotate(minutes=Coalesce(Sum('entries__minutes'), 0)) .annotate(last_log=Max('entries__updated')) )
def data(self): return self.get_query(product=self.product).annotate(complete=Max('complete')) #def wiki_registrations(): # return ( # MediaWikiUser.objects # .annotate(year=Substr('registration', 1, 4)) # .order_by('year') # .values('year') # .annotate( # users=Count('id', distinct=True) # ) # ) # # #def wiki_revisions(): # return ( # Revision.objects # .annotate(year=Substr('rev_timestamp', 1, 4)) # .order_by('year') # .values('year') # .annotate( # revisions=Count('id') # ) # ) # # #def wiki_registrations_year(year): # return ( # MediaWikiUser.objects # .annotate( # year=Substr('registration', 1, 4), # month=Substr('registration', 5, 2) # ) # .filter(year=str(year).encode()) # .order_by('month') # .values('month') # .annotate( # users=Count('id', distinct=True) # ) # ) # # #def wiki_revisions_year(year): # return ( # Revision.objects # .annotate( # year=Substr('rev_timestamp', 1, 4), # month=Substr('rev_timestamp', 5, 2) # ) # .filter(year=str(year).encode()) # .order_by('month') # .values('month') # .annotate( # revisions=Count('id') # ) # )
def handle(self, *args, **options): """ Send new message notifications """ # command to run: python manage.py tunga_send_customer_emails min_date = datetime.datetime.utcnow() - relativedelta(minutes=1) # 5 minute window to read new messages customer_channels = Channel.objects.filter( type=CHANNEL_TYPE_SUPPORT, created_by__isnull=True, content_type=ContentType.objects.get_for_model(Inquirer) ).annotate(new_messages=Sum( Case( When( ~Q(action_targets__actor_content_type=F('content_type')) & Q(action_targets__gt=F('last_read')) & Q(action_targets__timestamp__lte=min_date) & Q(action_targets__verb__in=[verbs.SEND, verbs.UPLOAD]), then=1 ), default=0, output_field=IntegerField() ) ), latest_message=Max('action_targets__id')).filter(new_messages__gt=0) for channel in customer_channels: customer = channel.content_object if customer.email: activities = Action.objects.filter( channels=channel, id__gt=channel.last_read, verb__in=[verbs.SEND] ).order_by('id') messages = [activity.action_object for activity in activities] if messages: to = [customer.email] subject = "[Tunga Support] Help" ctx = { 'customer': customer, 'count': channel.new_messages, 'messages': messages, 'channel_url': '%s/customer/help/%s/' % (TUNGA_URL, channel.id) } if send_mail(subject, 'tunga/email/unread_help_messages', to, ctx): channel.last_read = channel.latest_message channel.save()
def update_task_pm_updates(task): task = clean_instance(task, Task) target_task = task if task.parent: # for sub-tasks, create all pm updates on the project target_task = task.parent if target_task.closed or target_task.is_task or not target_task.approved or not target_task.pm: # only request pm updates for project which are approved and not closed return if target_task.update_interval and target_task.update_interval_units: periodic_start_date = ProgressEvent.objects.filter( Q(task=target_task) | Q(task__parent=target_task), type=PROGRESS_EVENT_TYPE_PM ).aggregate(latest_date=Max('due_at'))['latest_date'] now = datetime.datetime.utcnow() if periodic_start_date and periodic_start_date > now: return if not periodic_start_date: periodic_start_date = datetime.datetime.utcnow() if periodic_start_date: last_update_at = clean_update_datetime(periodic_start_date, target_task) while True: last_update_day = last_update_at.weekday() next_update_at = last_update_at if last_update_day < 3: # Last was before Thursday so schedule for Thursday next_update_at += relativedelta(days=3 - last_update_day) else: # Last was on after Thursday so schedule for Monday next_update_at += relativedelta(days=7 - last_update_day) if next_update_at >= now: future_by_18_hours = now + relativedelta(hours=18) if next_update_at <= future_by_18_hours and ( not target_task.deadline or next_update_at < target_task.deadline): num_updates_within_on_same_day = ProgressEvent.objects.filter( task=target_task, type=PROGRESS_EVENT_TYPE_PM, due_at__contains=next_update_at.date() ).count() if num_updates_within_on_same_day == 0: # Schedule at most one pm update for any day ProgressEvent.objects.update_or_create( task=target_task, type=PROGRESS_EVENT_TYPE_PM, due_at=next_update_at, defaults={'title': 'PM Report'} ) break else: last_update_at = next_update_at
def update_task_client_surveys(task): task = clean_instance(task, Task) target_task = task if task.parent: # for sub-tasks, create all surveys on the project target_task = task.parent if target_task.closed or not ( target_task.survey_client and target_task.approved and target_task.active_participants): # only conduct survey for approved tasks that have been assigned devs and aren't closed return if target_task.update_interval and target_task.update_interval_units: periodic_start_date = ProgressEvent.objects.filter( Q(task=target_task) | Q(task__parent=target_task), type=PROGRESS_EVENT_TYPE_CLIENT ).aggregate(latest_date=Max('due_at'))['latest_date'] now = datetime.datetime.utcnow() if periodic_start_date and periodic_start_date > now: return if not periodic_start_date: periodic_start_date = datetime.datetime.utcnow() if periodic_start_date: last_update_at = clean_update_datetime(periodic_start_date, target_task) while True: last_update_day = last_update_at.weekday() # Schedule next survey for Monday next_update_at = last_update_at + relativedelta(days=7 - last_update_day) if next_update_at >= now: future_by_18_hours = now + relativedelta(hours=18) if next_update_at <= future_by_18_hours and ( not target_task.deadline or next_update_at < target_task.deadline): num_updates_on_same_day = ProgressEvent.objects.filter( task=target_task, type=PROGRESS_EVENT_TYPE_CLIENT, due_at__contains=next_update_at.date() ).count() if num_updates_on_same_day == 0: # Schedule at most one survey for any day ProgressEvent.objects.update_or_create( task=target_task, type=PROGRESS_EVENT_TYPE_CLIENT, due_at=next_update_at, defaults={'title': 'Weekly Survey'} ) break else: last_update_at = next_update_at