我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.models.Max()。
def get_queryset(self): """Get queryset.""" return self.model.objects.all() \ .only('id', 'salutation', 'name', 'email') \ .annotate( number_of_books=Count('books'), first_book_published_on=Min('books__publication_date'), last_book_published_on=Max('books__publication_date'), lowest_book_price=Min('books__price'), highest_book_price=Max('books__price'), average_book_price=Avg('books__price'), average_number_of_pages_per_book=Avg('books__pages'), number_of_books_sold=Count('books__order_lines'), total_amount_earned=Sum( 'books__order_lines__book__price' ) )
def test_non_existent_plugin_submission(self): """ Test that a submitting a form for a non-existent plugin meets defined behavior. """ sample_name = self.get_random_string(10) sample_msg = self.get_random_string(100) # Get non-existent plugin ID: bad_id = CMSPlugin.objects.aggregate(max=Max('id'))['max'] + 1 assert bad_id action_url = reverse( 'cmsplugin_form_handler:process_form', args=(bad_id + 1, ) ) with self.assertRaises(ImproperlyConfigured): self.client.post(action_url, { 'name': sample_name, 'message': sample_msg, 'cmsplugin_form_source_url': '/en/', }, follow=True)
def index(request): latest_files = File.objects.order_by("-posted_date")[:10] latest_updated_families = ProductFamily.objects \ .annotate(last_posted_date=Max('file__posted_date')) \ .order_by('-last_posted_date')[:10] groups = ProductGroup.objects.order_by("name") total_count = File.objects.count() fcu_banner_expiration_date = datetime.date(2017, 11, 10) show_fcu_banner = datetime.date.today() < fcu_banner_expiration_date context = { 'show_fcu_banner': show_fcu_banner, 'latest_files': latest_files, 'latest_updated_families': latest_updated_families, 'groups': groups, 'total_count': total_count, } return render(request, 'msdn/index.html', context)
def _date_filter_response(self, field): query_dict = self._get_queryset_all().aggregate(Min(field), Max(field)) min_date = query_dict[field + '__min'] max_date = query_dict[field + '__max'] if isinstance(min_date, datetime.datetime): min_date = min_date.date() if isinstance(max_date, datetime.datetime): max_date = max_date.date() selected_dates = self.json_cfg.get('selected_filter_values', None) if selected_dates and not self.json_cfg.get('ignore_selected_values', False): selected_min_date = parse(selected_dates['min_date']).date() selected_max_date = parse(selected_dates['max_date']).date() reset_button = True else: selected_min_date = min_date selected_max_date = max_date reset_button = False return render_to_response('ajaxviews/_select_date_filter.html', { 'min_date': min_date, 'max_date': max_date, 'selected_min_date': selected_min_date, 'selected_max_date': selected_max_date, 'reset_button': reset_button, })
def get_queryset(self): """ Returns the custom QuerySet for this manager. """ return super( OCDFlatRetentionContestManager, self ).get_queryset().filter( Q(identifiers__scheme='calaccess_measure_id') | Q(identifiers__isnull=True) ).annotate( office=F('membership__post__label'), office_holder=F('membership__person__name'), ocd_person_id=F('membership__person__id'), election_name=F('election__name'), election_date=F('election__date'), ocd_contest_id=F('id'), ocd_post_id=F('membership__post_id'), ocd_membership_id=F('membership_id'), ocd_election_id=F('election_id'), calaccess_measure_id=Max('identifiers__identifier') )
def get_queryset(self): """ Returns the custom QuerySet for this manager. """ return super( OCDFlatBallotMeasureContestManager, self ).get_queryset().filter( Q(identifiers__scheme='calaccess_measure_id') | Q(identifiers__isnull=True) ).annotate( election_name=F('election__name'), election_date=F('election__date'), ocd_contest_id=F('id'), ocd_election_id=F('election_id'), calaccess_measure_id=Max('identifiers__identifier') )
def sort_qs(self, qs): if self.unit_filter and self.sort_by is not None: sort_by = self.sort_by if self.sort_on not in SIMPLY_SORTED: # Omit leading `-` sign if self.sort_by[0] == '-': max_field = self.sort_by[1:] sort_by = '-sort_by_field' else: max_field = self.sort_by sort_by = 'sort_by_field' # It's necessary to use `Max()` here because we can't # use `distinct()` and `order_by()` at the same time qs = qs.annotate(sort_by_field=Max(max_field)) return qs.order_by( sort_by, "store__pootle_path", "index") return qs
def get(self, request): tasks = Task.objects.all().values("slug", "id") grades = [] usernames = get_user_model().objects.all().values_list( 'username', flat=True) for username in usernames: user_grades = {'username': username} task_submissions = TaskSubmission.objects \ .filter(user__username=username).values('task_id') \ .annotate(grade=Max('grade')) for grade in task_submissions: user_grades[grade['task_id']] = grade['grade'] grades.append(user_grades) context = { 'grades': grades, 'tasks': tasks } return render(request, self.template_name, context, status=200)
def forwards_func(apps, schema_editor): ''' Fixing LateVedic for #41. See https://github.com/lingdb/CoBL/issues/41 ''' # Models to work with: Language = apps.get_model('lexicon', 'Language') LanguageList = apps.get_model('lexicon', 'LanguageList') LanguageListOrder = apps.get_model('lexicon', 'LanguageListOrder') # Data to work with: try: language = Language.objects.get(ascii_name='LateVedic') languageList = LanguageList.objects.get(name='Current') N = LanguageListOrder.objects.filter( language_list=languageList).aggregate(Max("order")).values()[0] # Appending to 'Current' list: LanguageListOrder.objects.create(language=language, language_list=languageList, order=(N + 1)) except Language.DoesNotExist: pass
def save(self, *args, **kwargs): """save(self, ..., save_descendants=True) Saves the page instance, and traverses all descendants to update their ``path`` fields and ensure that inactive pages (``is_active=False``) never have any descendants with ``is_active=True``. """ save_descendants = kwargs.pop('save_descendants', True) if not self.static_path: self.path = '{}{}/'.format( self.parent.path if self.parent else '/', self.slug) if not self.position: self.position = (self.__class__._default_manager.filter( parent_id=self.parent_id, ).order_by().aggregate(p=Max('position'))['p'] or 0) + 10 super(AbstractPage, self).save(*args, **kwargs) if save_descendants: for pk, node in self._branch_for_update().items(): if pk == self.pk: continue node.save(save_descendants=False)
def last_updated(self, related_name): """ Annotates the created timestamp of the latest related instance as given by the reverse relation's related_name. Usage: ad = Ad.objects.last_updated('rawdata').first() # assuming there's data related to ad print ad.rawdata_last_updated # prints the timestamp associated to when the ad's raw data was # last updated """ return self.annotate( **{ related_name + '_last_updated': Max( related_name + '__created' ) } )
def get_date(self, instance): user = instance.id today = date.today() if instance.date is not None: return instance.date # calculate last reported day if no specific date is set max_absence_date = Absence.objects.filter( user=user, date__lt=today).aggregate(date=Max('date')) max_report_date = Report.objects.filter( user=user, date__lt=today).aggregate(date=Max('date')) last_reported_date = max( max_absence_date['date'] or date.min, max_report_date['date'] or date.min ) instance.date = last_reported_date return instance.date
def public_stats(request: HttpRequest) -> HttpResponse: """Display public galleries and archives stats.""" if not crawler_settings.urls.enable_public_stats: if not request.user.is_staff: raise Http404("Page not found") else: return render_error(request, "Page disabled by settings (urls: enable_public_stats).") stats_dict = { "n_archives": Archive.objects.filter(public=True).count(), "archive": Archive.objects.filter(public=True).filter(filesize__gt=0).aggregate( Avg('filesize'), Max('filesize'), Min('filesize'), Sum('filesize')), "n_tags": Tag.objects.filter(gallery_tags__public=True).distinct().count(), "top_10_tags": Tag.objects.filter(gallery_tags__public=True).distinct().annotate( num_archive=Count('gallery_tags')).order_by('-num_archive')[:10] } d = {'stats': stats_dict} return render(request, "viewer/public_stats.html", d)
def insupd_system_offer_group(slug, default_name=None): """ Get or create a system offer group with the given ``slug``. If creating, assign the name from ``default_name``. """ from .models import OfferGroup max_priority = OfferGroup.objects.all().aggregate(Max('priority')).get('priority__max') max_priority = max_priority or 999 default_priority = max_priority + 1 defaults = { 'name': default_name or slug, 'priority': default_priority, } group, created = OfferGroup.objects.get_or_create(slug=slug, is_system_group=True, defaults=defaults) if created: logger.info('Create system offer group: {}, priority {}'.format(group.slug, group.priority)) return group
def get_activity(self, limit=20, offset=0, distinct=False, friends_only=False, request=None): #Todo: make distinct work; combine friends and following, but then get posts from them friends = Friendship.get_friendships(self, 0) friend_ids = [] for friend in friends: friend_ids.append(friend.other(self)) follows = self.follow_source.filter().values_list('target', flat=True) if not friends_only: friend_ids.append(self.id) for thing in follows: friend_ids.append(thing) if request.user.is_authenticated: has_yeah = Yeah.objects.filter(post=OuterRef('id'), by=request.user.id) if distinct: posts = Post.objects.select_related('creator').select_related('community').annotate(num_yeahs=Count('yeah', distinct=True), num_comments=Count('comment', distinct=True), yeah_given=Exists(has_yeah, distinct=True)).annotate(max_created=Max('creator__post__created')).filter(created=F('max_created')).filter(creator__in=friend_ids).order_by('-created')[offset:offset + limit] else: posts = Post.objects.select_related('creator').select_related('community').annotate(num_yeahs=Count('yeah', distinct=True), num_comments=Count('comment', distinct=True), yeah_given=Exists(has_yeah, distinct=True)).filter(creator__in=friend_ids).order_by('-created')[offset:offset + limit] if request: for post in posts: post.setup(request) post.recent_comment = post.recent_comment() return posts
def add_entry(self, **kwargs): visible = kwargs.pop('visible', True) index = kwargs.pop('index', None) if visible: last_index = self.timetable_entries.aggregate( models.Max('timetable_index'))['timetable_index__max'] if last_index is None: kwargs['timetable_index'] = 0 else: kwargs['timetable_index'] = last_index + 1 else: kwargs['timetable_index'] = None entry = self.timetable_entries.create(**kwargs) if index is not None and index != -1: entry.index = index if entry.optimal_start: entry.move_to_optimal_position() self._clear_caches() self.create_specialist_reviews() on_meeting_top_add.send(Meeting, meeting=self, timetable_entry=entry) return entry
def refresh(self, **kwargs): visible = kwargs.pop('visible', True) previous_index = self.timetable_index to_visible = visible and previous_index is None from_visible = not visible and not previous_index is None if to_visible: last_index = self.meeting.timetable_entries.aggregate(models.Max('timetable_index'))['timetable_index__max'] if last_index is None: kwargs['timetable_index'] = 0 else: kwargs['timetable_index'] = last_index + 1 elif from_visible: kwargs['timetable_index'] = None for k, v in kwargs.items(): setattr(self, k, v) self.save() if from_visible: changed = self.meeting.timetable_entries.filter( timetable_index__gt=previous_index) changed.update(timetable_index=F('timetable_index') - 1) # invisible tops don't have participants self.participations.all().delete() self.meeting._clear_caches() self.meeting.create_specialist_reviews()
def save(self, **kwargs): if not self.presenter_id: self.presenter = get_current_user() if not self.susar_presenter_id: self.susar_presenter = get_current_user() if not self.ec_number: with sudo(): year = timezone.now().year max_num = Submission.objects.filter(ec_number__range=(year * 10000, (year + 1) * 10000 - 1)).aggregate(models.Max('ec_number'))['ec_number__max'] if max_num is None: max_num = 10000 * year + MIN_EC_NUMBER else: year, num = divmod(max_num, 10000) max_num = year * 10000 + max(num, MIN_EC_NUMBER) # XXX: this breaks if there are more than 9999 studies per year (FMD2) self.ec_number = max_num + 1 return super().save(**kwargs)
def challenge_added(self, challenge): if self.challenge_set.count() > 1: if not self.min_score and not self.max_score: result = self.challenge_set.aggregate(Min('points'), Max('points')) if not result['points__min'] is result['points__max']: self.min_score = result['points__min'] self.max_score = result['points__max'] else: chal_p = challenge.points if chal_p > self.max_score: self.max_score = chal_p elif chal_p < self.min_score: self.min_score = chal_p self.save()
def save(self, *args, **kwargs): if self.id_movimiento == '': tipo = self.tipo_movimiento anio = self.fecha_operacion.year mov_ant = Movimiento.objects.filter(tipo_movimiento__incrementa=tipo.incrementa,fecha_operacion__year=anio).aggregate(Max('id_movimiento')) id_ant = mov_ant['id_movimiento__max'] if id_ant is None: aux = 1 else: aux=int(id_ant[-7:])+1 correlativo = str(aux).zfill(7) codigo = str(tipo.codigo[0:1])+str(anio)+correlativo self.id_movimiento = codigo self.anio=self.fecha_operacion.year, self.mes = self.fecha_operacion.month, super(Movimiento, self).save()
def get_lasts_messages_of_threads(self, participant_id, check_who_read=True, check_is_notification=True): """ Returns the last message in each thread """ # we get the last message for each thread # we must query the messages using two queries because only Postgres supports .order_by('thread', '-sent_at').distinct('thread') threads = Thread.managers.\ get_threads_where_participant_is_active(participant_id).\ annotate(last_message_id=Max('message__id')) messages = Message.objects.filter(id__in=[thread.last_message_id for thread in threads]).\ order_by('-id').\ distinct().\ select_related('thread', 'sender') if check_who_read is True: messages = messages.prefetch_related('thread__participation_set', 'thread__participation_set__participant') messages = self.check_who_read(messages) else: messages = messages.prefetch_related('thread__participants') if check_is_notification is True: messages = self.check_is_notification(participant_id, messages) return messages
def form_valid(self, form): """Handles foreign key constraints on form submission. Sets creator_id to the user that's currently logged in and gets the site ID from the named arguments in the URL. Afterwards, redirects to the success URL. """ video = form.save(commit=False) video.site = Site.objects.get(pk=self.kwargs['website_id']) video.creator = self.request.user video.save() messages.success(self.request, "%s was successfully created!" % video.title) if form.cleaned_data['series']: max_order = Listing.objects.filter(series=form.cleaned_data['series']).aggregate(Max('order'))['order__max'] listing_order = 1 if max_order is None else max_order + 1 listing = Listing(series=form.cleaned_data['series'][0], video=video, order=listing_order) listing.save() return redirect(self.get_success_url())
def _check_next_forbidden(pk): """ Check if next sequence item is forbidden to be shown to the student. :param pk: currently opened SequenseItem's pk :return: tuple of the parameters next_forbidden, last_item, sequence_item Where next_forbidden is boolean flag to forbid show next sequence item to the student, last_item (integer) is index of the last SequenceItem, sequence_item (SequenceItem inctance) of the currently open sequence item """ sequence_item = SequenceItem.objects.get(pk=pk) last_item = SequenceItem.objects.filter( sequence=sequence_item.sequence ).aggregate(last_item=Max('position'))['last_item'] next_forbidden = False if ( sequence_item.position == last_item and sequence_item.sequence.collection.strict_forward and sequence_item.score is None ): next_forbidden = True log.debug("Next item is forbidden: {}".format(next_forbidden)) return next_forbidden, last_item, sequence_item
def get_or_assign_number(self): """ Set a unique number to identify this Order object. The first 4 digits represent the current year. The last five digits represent a zero-padded incremental counter. """ if self.number is None: epoch = timezone.now().date() epoch = epoch.replace(epoch.year, 1, 1) aggr = Order.objects.filter(number__isnull=False, created_at__gt=epoch).aggregate(models.Max('number')) try: epoc_number = int(str(aggr['number__max'])[4:]) + 1 self.number = int('{0}{1:05d}'.format(epoch.year, epoc_number)) except (KeyError, ValueError): # the first order this year self.number = int('{0}00001'.format(epoch.year)) return self.get_number()
def get_stats(self): """ Fetches some statistical data. """ from biohub.biobrick.models import BiobrickMeta from django.db.models import Max self.stdout.write( "Collecting statistical data..." ) return dict( max_rate_score=5, **BiobrickMeta.objects.aggregate( max_rates=Max('rates'), max_stars=Max('stars'), max_watches=Max('watches') ) )
def _load_metabolites(self): with transaction.atomic(using=self.using): # Replace PK to_python methods metabolite_pk = models.Metabolite._meta.pk type_pk = models.MeasurementType._meta.pk metabolite_tp = metabolite_pk.to_python type_tp = type_pk.to_python type_objects = models.MeasurementType.objects.using(self.using) max_value = type_objects.aggregate(max=Max('pk'))['max'] metabolite_pk.to_python = lambda v: max_value + metabolite_tp(v) type_pk.to_python = lambda v: max_value + type_tp(v) # Import the fixture data call_command( 'loaddata', 'bootstrap-metabolite.json', app_label='main', database=self.using ) # Restore PK to_python methods metabolite_pk.to_python = metabolite_tp type_pk.to_python = type_tp
def _load_proteins(self): with transaction.atomic(using=self.using): # Replace PK to_python methods protein_pk = models.ProteinIdentifier._meta.pk type_pk = models.MeasurementType._meta.pk protein_tp = protein_pk.to_python type_tp = type_pk.to_python type_objects = models.MeasurementType.objects.using(self.using) max_value = type_objects.aggregate(max=Max('pk'))['max'] protein_pk.to_python = lambda v: max_value + protein_tp(v) type_pk.to_python = lambda v: max_value + type_tp(v) # Import the fixture data call_command( 'loaddata', 'bootstrap-proteins.json', app_label='main', database=self.using ) # Restore PK to_python methods protein_pk.to_python = protein_tp type_pk.to_python = type_tp
def order_by_time_remaining(self): artists = self.annotate(campaign_end_datetime=models.Max('project__campaign__end_datetime')) artists_current_campaign = artists.filter( campaign_end_datetime__gte=timezone.now() ).order_by('campaign_end_datetime') artists_current_campaign_no_end = artists.filter( project__campaign__isnull=False, campaign_end_datetime__isnull=True ) artists_past_campaign = artists.filter( campaign_end_datetime__lt=timezone.now() ).order_by('-campaign_end_datetime') artists_no_campaign = artists.filter(project__campaign__isnull=True) return ( list(artists_current_campaign) + list(artists_current_campaign_no_end) + list(artists_past_campaign) + list(artists_no_campaign) )
def versiones(request): max_set = Version.objects.values('tipo').annotate(timestamp_max=Max('timestamp')) # hack sacado de https://gist.github.com/ryanpitts/1304725 q_statement = Q() for pair in max_set: q_statement |= (Q(tipo__exact=pair['tipo']) & Q(timestamp=pair['timestamp_max'])) model_set = Version.objects.filter(q_statement) return [ { 'tipo' : v.tipo, 'timestamp': str(v.timestamp), 'name' : v.name, 'noticia' : v.noticia } for v in model_set ]
def save(self, *args, **kwargs): if self.order_index == 0: max_value = SaleItem.objects.filter(sale=self.sale).aggregate( max_value=Max('order_index') )['max_value'] or 0 self.order_index = max_value + 1 if self.is_blank: self.pre_tax_price = Decimal(0) self.quantity = 0 self.item = None self.text = '' self.vat_rate = None self.calculate_discount() return super(SaleItem, self).save(*args, **kwargs)
def set_choices(self,user): """ Sets the mitspieler choices to all other non-admin users""" choices = [] initial = [] users = User.objects\ .annotate(lastpart=Max('teller__storyparts__creation'))\ .filter(is_superuser=False, is_active=True)\ .exclude(id=user.id).all() for u in users: if u.lastpart: last_storypart_date = u.lastpart.strftime("%Y-%m-%d") else: last_storypart_date = "never" assert u != user and not u.is_superuser and u.is_active initial.append(u.pk) choices.append((u.pk, "%s (last active: %s)" % (str(u), last_storypart_date))) self.fields['mitspieler'].choices = choices self.fields['mitspieler'].initial = []
def test_returns_on_no_content_or_reply(self, mock_filter): add_to_stream_for_users( Content.objects.aggregate(max_id=Max("id")).get("max_id") + 1, Mock(), PublicStream, self.content.author.id, ) self.assertFalse(mock_filter.called) add_to_stream_for_users(self.reply.id, self.reply.id, PublicStream, self.reply.author.id) self.assertFalse(mock_filter.called)
def group_detail(request, group_id): group = get_object_or_404(ProductGroup, pk=group_id) families = ProductFamily.objects.filter(group_id=group_id) \ .annotate(Count('file'), Max('file__posted_date')) \ .order_by("name") context = {'group': group, 'families': families} return render(request, 'msdn/group_detail.html', context)
def get_queryset(self): """Get queryset.""" return self.model.objects.all() \ .annotate( number_of_books=Count('books'), first_book_published_on=Min('books__publication_date'), last_book_published_on=Max('books__publication_date'), lowest_book_price=Min('books__price'), highest_book_price=Max('books__price'), average_book_price=Avg('books__price'), average_number_of_pages_per_book=Avg('books__pages'), number_of_books_sold=Count('books__order_lines'), total_amount_earned=Sum('books__order_lines__book__price') ) \ .values('id', 'salutation', 'name', 'email', 'number_of_books', 'first_book_published_on', 'last_book_published_on', 'lowest_book_price', 'highest_book_price', 'average_book_price', 'average_number_of_pages_per_book', 'number_of_books_sold', 'total_amount_earned')
def save_model(self, request, obj, form, change): if not obj.pk: max_order = obj.__class__.objects.aggregate( models.Max(self.sortable)) try: next_order = max_order['%s__max' % self.sortable] + 1 except TypeError: next_order = 1 setattr(obj, self.sortable, next_order) super(SortableModelAdmin, self).save_model(request, obj, form, change) # Quite aggressive detection and intrusion into Django CMS # Didn't found any other solutions though
def customer_list(request): customers = ( User.objects .prefetch_related('orders', 'addresses') .select_related('default_billing_address', 'default_shipping_address') .annotate( num_orders=Count('orders', distinct=True), last_order=Max('orders', distinct=True)) .order_by('email')) customers = get_paginator_items( customers, DASHBOARD_PAGINATE_BY, request.GET.get('page')) ctx = {'customers': customers} return TemplateResponse(request, 'dashboard/customer/list.html', ctx)
def save(self, *args, **kwargs): if self.order is None: qs = self.get_ordering_queryset() existing_max = qs.aggregate(Max('order')) existing_max = existing_max.get('order__max') self.order = 0 if existing_max is None else existing_max + 1 super(ProductImage, self).save(*args, **kwargs)
def setup(self): super(GetLeafMixin, self).setup() qs = self.model._default_manager.all() if hasattr(self, 'root'): descendants = self.root.get_descendants() if isinstance(descendants, list): descendants = [d.pk for d in descendants] qs = qs.exclude(pk=self.root.pk).exclude(pk__in=descendants) if hasattr(self, 'branch'): descendants = self.branch.get_descendants() if isinstance(descendants, list): descendants = [d.pk for d in descendants] qs = qs.exclude(pk=self.branch.pk).exclude(pk__in=descendants) qs = (qs.annotate(n=Max('depth')).filter(depth=F('n'), depth__gt=1) if self.model in (TreebeardMPPlace, TreebeardNSPlace) else qs.filter(children__isnull=True, parent__isnull=False)) try: self.leaf = qs[qs.count() // 2] except IndexError: raise SkipTest # # Children #
def max_column(queryset, column, default): result = queryset.aggregate(result=Max(column))['result'] if result is None: return default return result
def get(self, request, task_id): context = { 'task_id': task_id, 'data_url': "/teachers/submissions/" + task_id, 'submissions': self.task.submissions .values('user', 'user__username', 'user__first_name', 'user__last_name') .annotate(grade=Max('grade')) } return render(request, self.template_name, context, status=200)
def append(self, language): """Add another language to the end of a LanguageList ordering""" N = list(self.languagelistorder_set.aggregate(Max("order")).values())[0] try: N += 1 except TypeError: assert N is None N = 0 LanguageListOrder.objects.create( language=language, language_list=self, order=N)
def append(self, meaning): """Add another meaning to the end of a MeaningList ordering""" N = list(self.meaninglistorder_set.aggregate(Max("order")).values())[0] try: N += 1 except TypeError: assert N is None N = 0 MeaningListOrder.objects.create( meaning=meaning, meaning_list=self, order=N)
def get_context_data(self, **kwargs): kwargs['query_data'] = self.request.GET.dict() status = self.request.GET.get('status') page = self.request.GET.get('page') live_courses = models.LiveCourse.objects.all() now = timezone.now() if status == 'to_start': live_courses = live_courses.annotate( start_time=Min("livecoursetimeslot__start")) live_courses = live_courses.filter(start_time__gt=now) elif status == 'under_way': live_courses = live_courses.annotate( start_time=Min("livecoursetimeslot__start")) live_courses = live_courses.annotate( end_time=Max("livecoursetimeslot__end")) live_courses = live_courses.filter(start_time__lte=now).filter( end_time__gte=now) elif status == 'end': live_courses = live_courses.annotate( end_time=Max("livecoursetimeslot__end")) live_courses = live_courses.filter(end_time__lt=now) live_courses = live_courses.order_by('-id') # paginate live_courses, pager = paginate(live_courses, page) kwargs['live_courses'] = live_courses kwargs['pager'] = pager return super(LiveCourseListView, self).get_context_data(**kwargs)
def get_queryset(self): queryset = self.queryset # ????????????? if self.action == 'list': queryset = queryset.annotate( start=Min('live_course__livecoursetimeslot__start'), end=Max('live_course__livecoursetimeslot__end'), ).filter(end__gt=timezone.now()).order_by('start') school_id = self.request.query_params.get('school') if school_id: queryset = queryset.filter(class_room__school_id=school_id) return queryset
def set_receipt_id(self, retry: int=0) -> None: try: self.receipt_id = 1 + (Transaction.objects.aggregate(m=Max('receipt_id'))['m'] or 0) self.save(update_fields=['receipt_id']) except Exception as e: if retry > 0: self.set_receipt_id(retry=retry - 1) else: raise e
def save_model(self, request, obj, form, change): if not obj.pk: max_order = obj.__class__.objects.aggregate( models.Max(self.sortable)) try: next_order = max_order['%s__max' % self.sortable] + 1 except TypeError: next_order = 1 setattr(obj, self.sortable, next_order) super(SortableModelAdmin, self).save_model(request, obj, form, change)
def relativeToDbTimestamp(self, relativeValue): try: pingProbes = PingTestResult.objects key, latestPingProbe = pingProbes.aggregate(Max('pingStart')).popitem() key, firstPingProbe = pingProbes.aggregate(Min('pingStart')).popitem() transferProbes = TransferTestResult.objects key, latestTransferProbe = transferProbes.aggregate(Max('transferStart')).popitem() key, firstTransferProbe = transferProbes.aggregate(Min('transferStart')).popitem() # in case of missing probes minTime = firstPingProbe maxTime = latestPingProbe if minTime is None or maxTime is None: minTime = firstTransferProbe maxTime = latestTransferProbe if minTime is None or maxTime is None: return datetime.datetime.utcfromtimestamp(0) try: if firstPingProbe > firstTransferProbe: minTime = firstTransferProbe maxTime = latestPingProbe if latestPingProbe < latestTransferProbe: maxTime = latestTransferProbe except: pass moment = time.mktime(minTime.timetuple()) + relativeValue * (time.mktime(maxTime.timetuple()) - time.mktime(minTime.timetuple())) return datetime.datetime.utcfromtimestamp(moment) except: return datetime.datetime.utcfromtimestamp(0)
def save(self, *args, **kwargs): if not self.order: self.order = 1 + ( Service.objects.aggregate( n=models.Max('order') )['n'] or 0 ) return super(Service, self).save(*args, **kwargs)
def move_to_x(self, where, request, queryset): if queryset.count() > 1: self.message_user(request, 'Please select only one service' ) return target = (where == 'top') and 1 or ( Service.objects.aggregate( n=Max('order') )['n'] or 1 ) selection = queryset.last() if selection.order == target: self.message_user(request, '%s is already at the %s!' % ( selection.name, where )) return if where == 'top': Service.objects.filter( order__lt=selection.order ).update(order=F('order') + 1) else: Service.objects.filter( order__gt=selection.order ).update(order=F('order') - 1) selection.order = target selection.save() self.message_user(request, 'Successfully moved "{}" to the {}'.format( selection.name, where ) )