我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.models.F。
def donations(request): ''' actual data from the FEC, propublica data, through the django ORM cache ''' data = {} data["representatives"] = {d["id"]: d for d in Representative.objects.all() .annotate(name=Concat('first_name', Value(" "), 'last_name')) .values("id", "name", "party")} data["committees"] = {d["id"]: d for d in SuperPAC.objects.all().values("id", "name")} data["donations"] = list(Donation.objects.all() .annotate(source=F("superpac_id"), destination=F("representative_id")) .values("source", "destination", "support") .annotate(amount=Sum("amount"))) return JsonResponse(data)
def make_inactive_productlist_query(queryset): now = timezone.now() # Create a query of things are definitively inactive. Some of the ones # filtered here might be out of stock, but we include that later. inactive_candidates = ( queryset .exclude( Q(active=True) & (Q(deactivate_date=None) | Q(deactivate_date__gte=now))) .values("id") ) inactive_out_of_stock = ( queryset .filter(sale__timestamp__gt=F("start_date")) .annotate(c=Count("sale__id")) .filter(c__gte=F("quantity")) .values("id") ) return ( queryset .filter( Q(id__in=inactive_candidates) | Q(id__in=inactive_out_of_stock)) )
def _lecturas_del_periodo(anio, mes, quincena=None, region_id=None, funcion=Avg): muestras = Muestra.objects.filter(anio=anio, mes=mes, aprobada=True) if quincena: muestras = muestras.filter(quincena=quincena) lecturas = Lectura.objects.filter(muestra__in=muestras, precio__gt=0) if region_id: lecturas = lecturas.filter( muestra__planilla_de_relevamiento__zona__jurisdiccion__region__pk=region_id) lecturas = lecturas.annotate(orden=F("producto_con_marca__producto_generico__id"))\ .annotate(producto=F("producto_con_marca__producto_generico__nombre"))\ .annotate(comercio=F('muestra__planilla_de_relevamiento__comercio__nombre'))\ .values('comercio', 'producto')\ .annotate(valor=funcion('precio'))\ .annotate(c_p=Concat(F('muestra__planilla_de_relevamiento__comercio__nombre'), F("producto_con_marca__producto_generico__nombre")))\ .order_by('orden', 'comercio') return lecturas
def donationsDemo(request): ''' actual data from the FEC, propublica data, through the django ORM cache this returns specially selected pieces of data for demo purposes ''' data = {"representatives":{},"committees":{},"donations":[]} representative_list = Representative.objects.all() special_reps = [488,200,390,119,49,445,491] for rep in representative_list: if rep.first_name+" "+rep.last_name in ['Ted Cruz','Marco Rubio','Tammy Duckworth','Rand Paul','Ruben Kihuen','Stephanie Murphy']: data["representatives"][rep.id] = rep.__json__() superpac_list = SuperPAC.objects.all() special_pacs = [3505,3498,3453,3323,276] for sup in superpac_list: if sup.name in ['MAKE DC LISTEN','COURAGEOUS CONSERVATIVES PAC','CONSERVATIVE SOLUTIONS PAC','RIGHT TO RISE USA','NEXTGEN CLIMATE ACTION COMMITTEE','AMERICA\'S LIBERTY PAC','STAND FOR TRUTH','TEXAS TEA PARTY PATRIOTS','FLORIDIANS FOR A STRONG MIDDLE CLASS','HOUSE MAJORITY PAC','PLANNED PARENTHOOD VOTES','IMMIGRANT VOTERS WIN PAC']: data["committees"][sup.id] = sup.__json__() data["donations"] = list(Donation.objects.all() .annotate(source=F("superpac_id"), destination=F("representative_id")) .values("source", "destination", "support") .annotate(amount=Sum("amount"))) return JsonResponse(data)
def query_or(cls, query, *values_list, **annotations): pop_annotations = False if 'pop_annotations' in annotations: pop_annotations = annotations['pop_annotations'] annotations.pop('pop_annotations') annotated_keys = annotations.values() annotations = {key: F(value) for key, value in annotations.items()} if isinstance(query, Iterable): query = reduce(or_, query) result = cls.objects.filter(query).values(*values_list).annotate(**annotations) if pop_annotations: for querydict in result: for value in annotated_keys: querydict.pop(value) return result # tipos de impuestos aplicables a los productos
def make_active_productlist_query(queryset): now = timezone.now() # Create a query for the set of products that MIGHT be active. Might # because they can be out of stock. Which we compute later active_candidates = ( queryset .filter( Q(active=True) & (Q(deactivate_date=None) | Q(deactivate_date__gte=now))) ) # This query selects all the candidates that are out of stock. candidates_out_of_stock = ( active_candidates .filter(sale__timestamp__gt=F("start_date")) .annotate(c=Count("sale__id")) .filter(c__gte=F("quantity")) .values("id") ) # We can now create a query that selects all the candidates which are not # out of stock. return ( active_candidates .exclude( Q(start_date__isnull=False) & Q(id__in=candidates_out_of_stock)))
def like(request): """ kullan?c? be?endi?inde vya be?enmedi?inde ekrandaki skoru otomatik update eder """ id = request.GET.get("id", default=None) like = request.GET.get("like") obj = get_object_or_404(Post, id=int(id)) if like == "true": # f objesi veri tabanindaki ilgili sutunun degerini cekerek # atama yapmak yerine arttirma veya azaltma yapmamizi saglar. obj.score = F("score") + 1 obj.save(update_fields=["score"]) elif like == "false": obj.score = F("score") - 1 obj.save(update_fields=["score"]) else: return HttpResponse(status=400) obj.refresh_from_db() return JsonResponse({"like": obj.score, "id": id})
def get_queryset(self): """Get queryset.""" return self.model.objects.all() \ .select_related('publisher') \ .prefetch_related('authors') \ .only('id', 'title', 'pages', 'price', 'publisher__id', 'publisher__name', 'authors__id', 'authors__name') \ .annotate( price_per_page=Sum( F('price') / F('pages'), output_field=DecimalField(decimal_places=2) ) )
def test_filtered_order_by(self): Blog.objects.bulk_create([ Blog(title='Falcon', title_nl='Valk'), Blog(title='Frog', title_nl='Kikker'), Blog(title='Fox', title_nl='Vos'), Blog(title='Gecko'), Blog(title='Gerbil'), Blog(title='Vulture', title_nl='Gier') ]) qs = Blog.objects.filter(title_en__contains='F').order_by('title_nl') self.assertEquals(key(qs, 'title_nl'), ['Kikker', 'Valk', 'Vos']) qs = Blog.objects.filter(title_en__contains='G').order_by('title_en') self.assertEquals(key(qs, 'title'), ['Gecko', 'Gerbil']) with override('nl'): qs = Blog.objects.filter(title_i18n__contains='G').order_by('title_i18n') self.assertEquals(key(qs, 'title_i18n'), ['Gecko', 'Gerbil', 'Gier']) with override('en'): qs = Blog.objects.filter(title_i18n__contains='G').order_by('-title_i18n') self.assertEquals(key(qs, 'title_i18n'), ['Gerbil', 'Gecko']) self.assertTrue('annotation' not in str(qs.query))
def get_queryset(self): """ Returns the custom QuerySet for this manager. """ return super( OCDFlatRetentionContestManager, self ).get_queryset().filter( Q(identifiers__scheme='calaccess_measure_id') | Q(identifiers__isnull=True) ).annotate( office=F('membership__post__label'), office_holder=F('membership__person__name'), ocd_person_id=F('membership__person__id'), election_name=F('election__name'), election_date=F('election__date'), ocd_contest_id=F('id'), ocd_post_id=F('membership__post_id'), ocd_membership_id=F('membership_id'), ocd_election_id=F('election_id'), calaccess_measure_id=Max('identifiers__identifier') )
def get_queryset(self): """ Returns the custom QuerySet for this manager. """ return super( OCDFlatBallotMeasureContestManager, self ).get_queryset().filter( Q(identifiers__scheme='calaccess_measure_id') | Q(identifiers__isnull=True) ).annotate( election_name=F('election__name'), election_date=F('election__date'), ocd_contest_id=F('id'), ocd_election_id=F('election_id'), calaccess_measure_id=Max('identifiers__identifier') )
def handle(self, *args, **options): # Create any missing user profiles for user in User.objects.filter(profile=None): Profile.objects.get_or_create(user_id=user.id) checks = Check.objects.filter(user__isnull=False) checks = checks.annotate(limit=F("user__profile__ping_log_limit")) for check in checks: q = Ping.objects.filter(owner_id=check.id) q = q.filter(n__lt=check.n_pings - check.limit) q = q.filter(n__gt=0) n_pruned, _ = q.delete() self.stdout.write("Pruned %d pings for check %s (%s)" % (n_pruned, check.id, check.name)) return "Done!"
def update_balance(self, reason, money, **kwargs): from billing.models import MoneyLog ml_filter = {} task = kwargs.get('task', None) ml_filter['reason'] = reason ml_filter['user'] = self ml_filter['debit'] = math.fabs(money) if money < 0 else 0 ml_filter['credit'] = math.fabs(money) if money > 0 else 0 ml_filter['money'] = money ml_filter['task'] = task current_balance = User.objects.select_for_update().get(pk=self.pk).balance ml_filter['balance'] = current_balance + Decimal(money) try: ml = MoneyLog.objects.get(**ml_filter) except MoneyLog.DoesNotExist: try: ml = MoneyLog.objects.create(**ml_filter) except IntegrityError: ml = MoneyLog.objects.get(**ml_filter) # User.objects.select_for_update().filter(pk=self.pk).update( # balance=F('balance') + Decimal(money) # )
def LatestQ(related_name, **kwargs): """ Constructs a django.db.models.Q instance that allows queries to be executed against the latest associated reverse relation. N.B. this method is designed to be used in conjunction with timeseries.utils.TimeSeriesQuerySet.last_updated. Usage: Ad.objects.last_updated('rawdata').filter( LatestQ('rawdata', views__gt=1000) ) """ parsed_kwargs = { related_name + "__created": F(related_name + '_last_updated') } for key, value in kwargs.iteritems(): parsed_kwargs[related_name + '__' + key] = value return Q(**parsed_kwargs)
def posts_issues(): all_jurs = Jurisdiction.objects.order_by('name') for jur in all_jurs: count = 0 issues = IssueType.get_issues_for('post') for issue in issues: if issue == 'many-memberships': queryset = Post.objects.filter( organization__jurisdiction=jur).annotate( num=Count('memberships')).filter( num__gt=F('maximum_memberships')) count += create_issues(queryset, issue, jur) elif issue == 'few-memberships': queryset = Post.objects.filter( organization__jurisdiction=jur).annotate( num=Count('memberships')).filter( num__lt=F('maximum_memberships')) count += create_issues(queryset, issue, jur) else: raise ValueError("Posts Importer needs " "update for new issue.") print("Imported Post Related {} Issues for {}".format(count, jur.name) )
def test_date_query(self): node1 = ExoticTypeNode.objects.create(v=date(1982, 9, 26), y=date(1982, 9, 29)) node2 = ExoticTypeNode.objects.create(v=date(1982, 9, 26), y=date(1982, 9, 30)) self.assertEqual(list(ExoticTypeNode.objects.filter( y__gt=F('v') + timedelta(days=3))), [node2]) self.assertEqual(ExoticTypeNode.objects.filter( y__gt=F('v') + timedelta(days=3)).count(), 1) self.assertEqual(ExoticTypeNode.objects.filter( y__gte=F('v') + timedelta(days=3)).count(), 2) self.assertEqual(list(ExoticTypeNode.objects.filter( y__lte=F('v') + timedelta(days=3))), [node1]) self.assertEqual(list(ExoticTypeNode.objects.filter( y__lte=F('v') + timedelta(days=-4))), [])
def record_view(request, post_id): post = get_object_or_404(Post, pk=post_id) session_id = request.session.session_key if not session_id: request.session.save() session_id = request.session.session_key if not PostView.objects.filter(post=post, session=session_id): view = PostView(post=post, ip=request.META['REMOTE_ADDR'], created=timezone.now(), session=session_id) view.save() Post.objects.filter(pk=post_id).update(view_count=F('view_count') + 1)
def eligible_for_use(self, **kwargs: typing.Any) -> QuerySet: return self.get_queryset().eligible_for_use(**kwargs) # # This is commented because it didn't work in MySQL # def need_new_archive(self, **kwargs): # # return self.annotate( # num_archives=Count('archive') # ).filter( # ( # ( # Q(num_archives=1) & # ~Q(filesize=F('archive__filesize')) & # ~Q(filesize=0) # ) | # Q(archive__isnull=True) # ), # **kwargs # ).prefetch_related('archive_set').order_by('-create_date')
def record_completion(experiment, user_id, variation, request): # abort if this user never started the experiment if experiment.id not in request.session.get('experiments_started', []): return # abort if this user has completed already experiments_completed = request.session.get('experiments_completed', []) if experiment.id in experiments_completed: return experiments_completed.append(experiment.id) request.session['experiments_completed'] = experiments_completed # get or create a History record for this experiment variation and the current date history, _ = ExperimentHistory.objects.get_or_create( experiment=experiment, variation=variation, date=datetime.date.today() ) # increment the completion_count ExperimentHistory.objects.filter(pk=history.pk).update(completion_count=F('completion_count') + 1)
def transaction_save_handler(sender, instance, created, **kwargs): """Updates the Ledger balance after a transaction is saved. Args: instance: a Transaction instance """ if not created: raise ValueError("Transactions should not be modified") # Atomically update the ledger's balance. with transaction.atomic(): ledger = (Ledger.objects.select_for_update() .get(id=instance.ledger.id)) # check if billing is enabled if not ledger.network.billing_enabled: # Transaction creator should have checked before saving, # issue a warning logger.with_trace( logger.warning, "creator should check if billing is enabled" ) return ledger.balance = F('balance') + instance.amount ledger.save()
def annotate_queryset(self, queryset, request): self.get_skills_and_causes(request) queryset = queryset\ .annotate(\ cause_relevance = Count( Case(When(causes__pk__in=[1,3], then=1), output_field=IntegerField()), distinct=True), skill_relevance = Count( Case(When(skills__pk__in=[1,2,4], then=1), output_field=IntegerField()), distinct=True))\ .annotate(relevance = F('cause_relevance') + F('skill_relevance')) return queryset
def products_changed_handler(sender, **kwargs): """ ?????????? ??????? ????????? ???-?? ??????? ?????????, ??????????? ??????????????? ? ?????????. ???????????? ??? ?????????? ????????? ??????? ? ?????? ?????????. """ categories = kwargs.get('categories') if isinstance(categories, ShopCategory): # ????????? ????????? categories = ShopCategory.objects.filter(pk=categories.pk) elif isinstance(categories, (int, str)): # ?????? ??? ?????, ?????????? ID ????????? categories = ShopCategory.objects.filter(pk=categories) elif isinstance(categories, (list, tuple, set, ValuesListQuerySet)): # ?????? ????? ??? ?????, ?????????? ID ????????? categories = ShopCategory.objects.filter(pk__in=categories) elif isinstance(categories, QuerySet) and categories.model is ShopCategory: # QuerySet ????????? pass else: raise TypeError('Invalid categories for signal "products_changed"') with transaction.atomic(): categories.update( product_count=RawSQL( '(SELECT COUNT(*) ' 'FROM shop_shopproduct AS ssp ' 'WHERE ssp.category_id = shop_shopcategory.id ' 'AND ssp.is_visible = TRUE)', () ) ) categories.update( total_product_count=F('product_count') ) categories_changed.send(ShopCategory, categories=categories)
def filter_approx_distance(self, queryset, value): """ Filters all results who's address object has a lat long approximatly value[0] from value[1] """ # Assume value is in the form (distance, lat, long) try: vals = make_tuple(value) except: # if something bad happened just fallabck to not working for now return queryset # remove queryset objects tha have no address queryset = queryset.filter(address_object__isnull=False) pi = 3.1415 f_lat = pi*(vals[1] - F('address_object__latitude'))/180.0 f_long = pi*(vals[2] - F('address_object__longitude'))/180.0 m_lat = 0.5*pi*(vals[1] + F('address_object__latitude'))/180.0 cosprox = 1 - (m_lat**2)/2.0 # approximate cosine approx_dist = (6371**2)*(f_lat**2 + (cosprox*f_long)**2) queryset = queryset.annotate(dist=(approx_dist - vals[0]**2)).annotate(flat=f_lat) queryset = queryset.filter(dist__lte=0) return queryset
def get_activity(self, limit=20, offset=0, distinct=False, friends_only=False, request=None): #Todo: make distinct work; combine friends and following, but then get posts from them friends = Friendship.get_friendships(self, 0) friend_ids = [] for friend in friends: friend_ids.append(friend.other(self)) follows = self.follow_source.filter().values_list('target', flat=True) if not friends_only: friend_ids.append(self.id) for thing in follows: friend_ids.append(thing) if request.user.is_authenticated: has_yeah = Yeah.objects.filter(post=OuterRef('id'), by=request.user.id) if distinct: posts = Post.objects.select_related('creator').select_related('community').annotate(num_yeahs=Count('yeah', distinct=True), num_comments=Count('comment', distinct=True), yeah_given=Exists(has_yeah, distinct=True)).annotate(max_created=Max('creator__post__created')).filter(created=F('max_created')).filter(creator__in=friend_ids).order_by('-created')[offset:offset + limit] else: posts = Post.objects.select_related('creator').select_related('community').annotate(num_yeahs=Count('yeah', distinct=True), num_comments=Count('comment', distinct=True), yeah_given=Exists(has_yeah, distinct=True)).filter(creator__in=friend_ids).order_by('-created')[offset:offset + limit] if request: for post in posts: post.setup(request) post.recent_comment = post.recent_comment() return posts
def viral_video_detail(request, id): yesterday = datetime.date.today() - datetime.timedelta(days=1) qs = ViralVideo.objects.annotate( total_impressions=models.F("desktop_impressions") + models.F("mobile_impressions"), label=models.Case( models.When(total_impressions__gt=POPULAR_FROM, then=models.Value("popular")), models.When(created__gt=yesterday, then=models.Value("new")), default=models.Value("cool"), output_field=models.CharField(), ), ) # DEBUG: check the SQL query that Django ORM generates print(qs.query) qs = qs.filter(pk=id) if request.flavour == "mobile": qs.update(mobile_impressions=models.F("mobile_impressions") + 1) else: qs.update(desktop_impressions=models.F("desktop_impressions") + 1) video = get_object_or_404(qs) return render(request, "viral_videos/viral_video_detail.html", {'video': video})
def _set_index(self, index): if index < 0 or index >= len(self.meeting): raise IndexError() old_index = self.timetable_index if index == old_index: return entries = self.meeting.timetable_entries.filter(timetable_index__isnull=False) if old_index > index: changed = entries.filter( timetable_index__gte=index, timetable_index__lt=old_index) changed.update(timetable_index=F('timetable_index') + 1) elif old_index < index: changed = entries.filter( timetable_index__gt=old_index, timetable_index__lte=index) changed.update(timetable_index=F('timetable_index') - 1) self.timetable_index = index self.save(force_update=True) self.meeting._clear_caches() on_meeting_top_index_change.send(Meeting, meeting=self.meeting, timetable_entry=self)
def refresh(self, **kwargs): visible = kwargs.pop('visible', True) previous_index = self.timetable_index to_visible = visible and previous_index is None from_visible = not visible and not previous_index is None if to_visible: last_index = self.meeting.timetable_entries.aggregate(models.Max('timetable_index'))['timetable_index__max'] if last_index is None: kwargs['timetable_index'] = 0 else: kwargs['timetable_index'] = last_index + 1 elif from_visible: kwargs['timetable_index'] = None for k, v in kwargs.items(): setattr(self, k, v) self.save() if from_visible: changed = self.meeting.timetable_entries.filter( timetable_index__gt=previous_index) changed.update(timetable_index=F('timetable_index') - 1) # invisible tops don't have participants self.participations.all().delete() self.meeting._clear_caches() self.meeting.create_specialist_reviews()
def save(self, *args, **kwargs): """ The get the mashup algo and sources, then do some munging and save it """ if self.mashup.algorithm == 'MJN': # Randomly order the corpora we're going to join, this will generate better output mashed, parent_sents = mashup_algorithms.mouse_join(self.mashup.corpora.all(), smashtag=True) else: # TODO: add other join methods.. MJN used for everything rn mashed, parent_sents = mashup_algorithms.mouse_join(self.mashup.corpora.all(), smashtag=True) self.body = mashed super().save(*args, **kwargs) # Save the parent sentences.. M2M needs an instance first, so we add them after saving self.sentences.add(*parent_sents) # Update mash counts of the source corpora for corpus in self.mashup.corpora.all(): Corpus.objects.filter(id=corpus.id).update(mash_count=F('mash_count') + 1)
def get_queryset(self): empty_str = ExpressionWrapper(V(''), output_field=CharField()) future_meeting = models.MeetingHistory.objects.latest('date') return models.PresentHistory.objects.values( date=F('meeting__date'), presentation_type=F('present_type'), presenter_name=F('presenter__name'), present_content=F('content'), ).exclude(meeting__date=future_meeting.date).order_by().union( models.MeetingSkip.objects.all().values( 'date', presentation_type=Concat(V('Postponed: '), 'reason'), presenter_name=empty_str, present_content=empty_str, ).filter(date__lte=date.today()).order_by() ).order_by('-date')
def process(self): newobj = self.kwargs['instance'] newobj.depth = self.node.depth + 1 if self.node.numchild == 0: # the node had no children, adding the first child newobj.path = _get_path( self.node.path, newobj.depth, 1) max_length = self.node_cls._meta.get_field('path').max_length if len(newobj.path) > max_length: raise Exception( 'The new node is too deep in the tree, try' ' increasing the path.max_length property' ' and UPDATE your database') else: # adding the new child as the last one newobj.path = _inc_path(self.node.last_child) # saving the instance before returning it newobj.save() newobj._cached_parent_obj = self.node self.model.objects.filter( path=self.node.path).update(numchild=F('numchild') + 1) # we increase the numchild value of the object in memory self.node.numchild += 1 return newobj
def range_statistics(start, end): """ Returns the statistics (totals) for a target date. Its month will be used. """ return DayStatistics.objects.filter(day__gte=start, day__lt=end).aggregate( total_cost=Sum('total_cost'), electricity1=Sum('electricity1'), electricity1_cost=Sum('electricity1_cost'), electricity1_returned=Sum('electricity1_returned'), electricity2=Sum('electricity2'), electricity2_cost=Sum('electricity2_cost'), electricity2_returned=Sum('electricity2_returned'), electricity_merged=Sum(models.F('electricity1') + models.F('electricity2')), electricity_cost_merged=Sum(models.F('electricity1_cost') + models.F('electricity2_cost')), electricity_returned_merged=Sum(models.F('electricity1_returned') + models.F('electricity2_returned')), gas=Sum('gas'), gas_cost=Sum('gas_cost'), temperature_min=Min('lowest_temperature'), temperature_max=Max('highest_temperature'), temperature_avg=Avg('average_temperature'), )
def handle_rollover(inst: Instrument, new_bar: DailyBar): """ ????, ??=??????-??????, ??????????????OHLC???? """ product_code = re.findall('[A-Za-z]+', new_bar.code)[0] old_bar = DailyBar.objects.filter(exchange=inst.exchange, code=inst.last_main, time=new_bar.time).first() main_bar = MainBar.objects.get( exchange=inst.exchange, product_code=product_code, time=new_bar.time) if old_bar is None: old_close = new_bar.close else: old_close = old_bar.close basis = new_bar.close - old_close main_bar.basis = basis basis = float(basis) main_bar.save(update_fields=['basis']) MainBar.objects.filter(exchange=inst.exchange, product_code=product_code, time__lte=new_bar.time).update( open=F('open') + basis, high=F('high') + basis, low=F('low') + basis, close=F('close') + basis, settlement=F('settlement') + basis)
def link_ice_entry_to_study(self, user_token, strain, study): """ Task runs the code to register a link between an ICE entry and an EDD study. :param user_token: the token used to identify a user to ICE :param strain: the primary key of the EDD main.models.Strain in the link :param study: the primary key of the EDD main.models.Study in the link :throws Exception: for any errors other than communication errors to ICE instance """ # check that strain and study are still linked query = models.Strain.objects.filter(pk=strain, line__study__pk=study) if query.exists(): try: ice = create_ice_connection(user_token) record = query.annotate( study_slug=F('line__study__slug'), study_name=F('line__study__name'), ).distinct().get() url = build_study_url(record.study_slug) ice.link_entry_to_study(record.registry_id, study, url, record.study_name) except RequestException as e: # Retry when there are errors communicating with ICE raise self.retry(exc=e, countdown=delay_calculation(self), max_retries=10) except Exception as e: raise e
def descargar_datos(request): if not (hasattr(request.user, "perfil") and \ request.user.perfil.autorizacion >= PERMISO_COORD_ZONAL): messages.error(request, 'Permisos insuficientes.') return render(request, 'relevamiento/mensaje.html') regiones = Region.objects.all() muestras = Muestra.objects.filter(aprobada=True)\ .values("anio", "mes", "quincena")\ .annotate(region=F("planilla_de_relevamiento__zona__jurisdiccion__region__nombre"))\ .annotate(region_id=F("planilla_de_relevamiento__zona__jurisdiccion__region__id"))\ .annotate(cantidad=Count("id")).order_by("-anio", "-mes", "-quincena", "region") muestras_pais = Muestra.objects.filter(aprobada=True).values("anio", "mes", "quincena")\ .annotate(cantidad=Count("id")).order_by("-anio", "-mes", "-quincena") return render(request, 'relevamiento/descargar_datos.html', {"regiones": regiones, "muestras": muestras, "muestras_pais": muestras_pais})
def like_comment(request): if getattr(request, 'limited', False): # if the request was limited act as if everything # went smoothly ¯\_(?)_/¯ pass # return JsonResponse({'status': 'KIK!'}) elif request.method == 'POST': return JsonResponse(status=400) try: comment_id = request.GET['comment_id'] except LookupError: return JsonResponse(status=404) with transaction.atomic(): with Comment.objects.disable_mptt_updates(): comment = Comment.objects.get(pk=comment_id) comment.votes = F('votes') + 1 comment.save() # TODO: still expensive, but much better if Comment.objects.get(pk=comment_id).votes % 4 == 0: Comment.objects.rebuild() return JsonResponse({'status': 'Success!'})
def get_queryset(self): queryset = (models.Event.objects.all() .select_related('calendar') .prefetch_related('tags') .annotate(_participants=Sum(F('rsvps__guests') + 1)) ) if not self.request.user.has_perm('events.view_hidden_event'): queryset = queryset.filter(published=True) after_query = self.request.query_params.get('after', None) before_query = self.request.query_params.get('before', None) lookup_url_kwarg = self.lookup_url_kwarg or self.lookup_field # in the case there is no after_query parameters, and we are not on a single object page # we set a default after value of today if lookup_url_kwarg not in self.kwargs and after_query is None and before_query is None: queryset = queryset.upcoming(as_of=timezone.now(), published_only=False) return queryset
def usercp(request): threads = (Thread.objects.all() .filter( threadflag__poster_id=request.user.id, threadflag__subscribed=True, last_update__gt=F('threadflag__last_read_date')) .order_by('-last_update')) threads_per_page = utils.get_config('threads_per_forum_page') paginator = utils.MappingPaginator(threads, threads_per_page) paginator.install_map_func(lambda t: utils.ThreadFascet(t, request)) page = utils.page_by_request(paginator, request) ctx = { 'threads': page } return render(request, 'user_cp.html', ctx)
def lines(self): if self._lines is None: self._quantities = {} if self._cart is not None: self._lines = self._cart.line_basket_sales.filter(removed=False).values( 'product__pk', 'quantity' ).annotate( pk=F('product__pk') ) for line in self._lines: line.pop('product__pk') self._quantities[line['pk']] = line['quantity'] elif self._session is not None: self._lines = self._session[ShoppingCartProxy.SESSION_KEY] self._quantities = {line['pk']: line['quantity'] for line in self._lines} return self._lines
def get_form(self, form_class=None): # form_kwargs = super(LineBasketUpdateModal, self).get_form_kwargs(*args, **kwargs) form = super(LineBasketUpdateModal, self).get_form(form_class) initial = form.initial initial['type_tax'] = self.object.product.product.tax.pk initial['price'] = self.object.total if self.__is_pack: options = [] lang = get_language_database() for option in SalesLineBasketOption.objects.filter(line_budget__pk=self.__line_pk): initial['packs[{}]'.format(option.product_option.pk)] = option.product_final.pk a = { 'id': option.product_option.pk, 'label': getattr(option.product_option, lang).name, 'products': list(option.product_option.products_pack.all().values('pk').annotate(name=F('{}__name'.format(lang)))), 'selected': option.product_final.pk, } options.append(a) # compatibility with GenForeignKey initial['packs'] = json.dumps({'__JSON_DATA__': options}) return form
def get_closest_station_data(zip_code): # get the users zip code, and look up the latitude and longitude my_lat, my_long = lawnutils.get_lat_long(zip_code) """ Below finds the closest station to the zip code by finding the distance between two points. """ delta_lat = my_lat - F('latitude') delta_long = my_long - F('longitude') distance_equation = (delta_lat**2+delta_long**2)**0.5 """ This statement queries the database to sort the stations by their distance from the zip_code. Then calling query[0] gives the closest station to the zip code. """ query = WeatherStation.objects.all().annotate(distance=distance_equation).order_by('distance') closest_station = query[0] return closest_station
def get_notifications(request, employee_id): """ Get all notifications for employee id --- response_serializer: activities.serializers.NotificationSerializer responseMessages: - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden. - code: 404 message: Not found - code: 500 message: Internal Server Error """ if request.method == 'GET': employee = get_object_or_404(Employee, pk=employee_id) activities = Activity.objects.annotate( profile=F('to_user')).values('datetime', 'text', 'profile').filter(to_user=employee) messages = Message.objects.annotate( profile=F('from_user')).values('datetime', 'text', 'profile').filter(Q(to_user='all') | Q(to_user=employee.location.name) | Q(to_user=employee.username)) notifications = list(chain(activities, messages)) notifications = sorted(notifications, reverse=True) paginator = PageNumberPagination() results = paginator.paginate_queryset(notifications, request) serializer = NotificationSerializer(results, many=True) return paginator.get_paginated_response(serializer.data)
def search_in_fields(self, queryset, search_query, start, stop): return (self.get_in_fields_queryset(queryset, search_query) .annotate(_rank_=SearchRank(F('_search_'), search_query, weights=WEIGHTS_VALUES)) .order_by('-_rank_'))[start:stop]
def get_context_data(self, **kwargs): """Get context data.""" # Just get some book IDs. book_ids = Book.objects.values_list('id', flat=True)[:50] books = Book.objects \ .filter(id__in=book_ids) \ .update(stock_count=F('stock_count') - 1) return {'number_of_books_updated': books}
def test_filter_F_expression(self): Blog.objects.create(title='foo', title_nl=20, title_fr=10) Blog.objects.create(title='bar', title_nl=20, title_fr=30) Blog.objects.create(title='baz', title_nl=20, title_fr=40) qs = Blog.objects.filter(title_nl__gt=F('title_fr')) self.assertEquals({m.title for m in qs}, {'foo'}) qs = Blog.objects.filter(title_nl__lt=F('title_fr')) self.assertEquals({m.title for m in qs}, {'bar', 'baz'})
def test_filter_F_expressions_function(self): Blog.objects.create(title='foo', title_nl='foo') Blog.objects.create(title='bar', title_nl='BAR') Blog.objects.create(title='baz', title_nl='BAZ') qs = Blog.objects.filter( title_nl=models.functions.Upper(F('title_nl')) ) self.assertEquals({m.title for m in qs}, {'bar', 'baz'})
def test_regular_fields(self): qs = Blog.objects.all().order_by('-title') self.assertEquals(key(qs, 'title'), 'G,F,E,D,C,B,A'.split(','))
def test_order_asc(self): qs = Blog.objects.all().order_by('title_nl') self.assertEquals(key(qs, 'title_nl'), sorted(self.NL)) self.assertEquals(key(qs, 'title'), 'A B C D G F E'.split())
def allocate_stock(self, stock, quantity): stock.quantity_allocated = F('quantity_allocated') + quantity stock.save(update_fields=['quantity_allocated'])
def deallocate_stock(self, stock, quantity): stock.quantity_allocated = F('quantity_allocated') - quantity stock.save(update_fields=['quantity_allocated'])