我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用django.db.models.functions.Concat()。
def donations(request): ''' actual data from the FEC, propublica data, through the django ORM cache ''' data = {} data["representatives"] = {d["id"]: d for d in Representative.objects.all() .annotate(name=Concat('first_name', Value(" "), 'last_name')) .values("id", "name", "party")} data["committees"] = {d["id"]: d for d in SuperPAC.objects.all().values("id", "name")} data["donations"] = list(Donation.objects.all() .annotate(source=F("superpac_id"), destination=F("representative_id")) .values("source", "destination", "support") .annotate(amount=Sum("amount"))) return JsonResponse(data)
def _lecturas_del_periodo(anio, mes, quincena=None, region_id=None, funcion=Avg): muestras = Muestra.objects.filter(anio=anio, mes=mes, aprobada=True) if quincena: muestras = muestras.filter(quincena=quincena) lecturas = Lectura.objects.filter(muestra__in=muestras, precio__gt=0) if region_id: lecturas = lecturas.filter( muestra__planilla_de_relevamiento__zona__jurisdiccion__region__pk=region_id) lecturas = lecturas.annotate(orden=F("producto_con_marca__producto_generico__id"))\ .annotate(producto=F("producto_con_marca__producto_generico__nombre"))\ .annotate(comercio=F('muestra__planilla_de_relevamiento__comercio__nombre'))\ .values('comercio', 'producto')\ .annotate(valor=funcion('precio'))\ .annotate(c_p=Concat(F('muestra__planilla_de_relevamiento__comercio__nombre'), F("producto_con_marca__producto_generico__nombre")))\ .order_by('orden', 'comercio') return lecturas
def update_list_best_practice(): """ Hits the DB once no matter how many objects. NB: - save() will not be called, and the related signals will not be sent. - does not work with m2m relationships. >>> UPDATE community SET name = (community.name + ' e.V') WHERE community.name LIKE 'community%' """ communities = Community.objects.filter(name__startswith='community') communities.update(name=Concat('name', Value(' e.V'))) ## Bulk delete ##############
def get_queryset(self): empty_str = ExpressionWrapper(V(''), output_field=CharField()) future_meeting = models.MeetingHistory.objects.latest('date') return models.PresentHistory.objects.values( date=F('meeting__date'), presentation_type=F('present_type'), presenter_name=F('presenter__name'), present_content=F('content'), ).exclude(meeting__date=future_meeting.date).order_by().union( models.MeetingSkip.objects.all().values( 'date', presentation_type=Concat(V('Postponed: '), 'reason'), presenter_name=empty_str, present_content=empty_str, ).filter(date__lte=date.today()).order_by() ).order_by('-date')
def list_employees_search(request,name): """ parameters/returns: employees: lista de objetos employee a los que tiene acceso el administrador (los que están en su empresa) template: employee_list.html """ # Check that the current user has permissions lista = get_list_for_role(request) if name != "all_true": #little opt just if not empty lista=lista.annotate( search_name=Concat('user__first_name', V(' '), 'user__last_name') ).filter(search_name__icontains=name) employees = lista.filter(user__is_active=True) return render(request, "employee/employee_search.html", {"employees": employees})
def search(request): ''' Searches through all available sources to match the user's query ''' query = request.GET["query"] ## match the query against a zipcode regex, go a zipcode search if it matches if re.match("^\d{5}$", query): ## here we call an external api to search for the reps via zipcode results = SunlightAPI().rep_by_zip(query) ## loop through the results reps = [] for rep in results["results"]: ## try finding the rep in our database reps += (Representative.objects.all() .annotate(name=Concat('first_name', Value(" "), 'last_name')) .filter(name=rep["first_name"]+" "+rep["last_name"]) .values("id", "name", "party")) ## return the found reps return JsonResponse({"representatives": reps, "committees": []}) data = {} data["representatives"] = list(Representative.objects.all() .annotate(name=Concat('first_name', Value(" "), 'last_name')) .filter(name__icontains=query) .values("id", "name", "party")) data["committees"] = list(SuperPAC.objects.filter(name__icontains=query) .values("id", "name")) if "." in query or len(query) > 5: results = SunlightAPI().search(query) data["bills"] = results["results"] return JsonResponse(data)
def get_queryset(self): queryset = Report.objects.all() queryset = queryset.annotate( year=ExtractYear('date'), month=ExtractMonth('date') ) queryset = queryset.values('year', 'month') queryset = queryset.annotate(duration=Sum('duration')) queryset = queryset.annotate(pk=Concat('year', Value('-'), 'month')) return queryset
def get_queryset(self): date = self._extract_date() user = self.request.user queryset = get_user_model().objects.values('id') queryset = queryset.annotate( date=Value(date, DateField()), ) # last_reported_date filter is set, a date can only be calucated # for users with either at least one absence or report if date is None: users_with_reports = Report.objects.values('user').distinct() users_with_absences = Absence.objects.values('user').distinct() active_users = users_with_reports.union(users_with_absences) queryset = queryset.filter(id__in=active_users) queryset = queryset.annotate( pk=Concat( 'id', Value('_'), 'date', output_field=CharField() ) ) if not user.is_superuser: queryset = queryset.filter( Q(id=user.id) | Q(supervisors=user) ) return queryset
def get_queryset(self): date = self._extract_date() user = self._extract_user() queryset = models.AbsenceType.objects.values('id') queryset = queryset.annotate( date=Value(date, DateField()), ) queryset = queryset.annotate( user=Value(user.id, IntegerField()), ) queryset = queryset.annotate( pk=Concat( 'user', Value('_'), 'id', Value('_'), 'date', output_field=CharField() ) ) # only myself, superuser and supervisors may see by absence balances current_user = self.request.user if not current_user.is_superuser: if current_user.id != user.id: if not current_user.supervisees.filter(id=user.id).exists(): return models.AbsenceType.objects.none() return queryset
def get_queryset(self, request): qs = super(CommentAdmin, self).get_queryset(request) if django.VERSION >= (1, 8): # Concat is not available in 1.7 and 1.6 qs = qs.annotate(author_sort=Concat('author_type', 'author_id', output_field=TextField())) qs = qs.annotate(page_sort=Concat('page_type', 'page_id', output_field=TextField())) return qs
def update_output(cls, execution_id, stdout, stderr): """ Append to stdout & stderr. Use concatenation to efficiently update the fields. """ query = Execution.objects.filter(id=execution_id) query.update( stdout=functions.Concat('stdout', models.Value(stdout or '')), stderr=functions.Concat('stderr', models.Value(stderr or '')) )
def resolve_search(self, args, context, info): term = args['q'] people = Person.objects.filter(name__icontains=term) houses = House.objects.annotate(name=Concat(F('number'), Value(' '), F('street__name'), output_field=models.TextField())).filter(name__icontains=term) streets = Street.objects.filter(name__icontains=term) return itertools.chain(people, houses, streets)
def match_form501s_by_name(self): """ Get Form501Filings with names that match the scraped candidate's name. Returns a Form501Filing QuerySet. """ from calaccess_processed.models import Form501Filing election_data = self.election_proxy.parsed_name office_data = self.parse_office_name() q = Form501Filing.objects.filter( office__iexact=office_data['type'], district=office_data['district'], election_year__lte=election_data['year'], ) # first, try "<last_name>, <first_name> <middle_name>" format last_first_middle_q = q.annotate( full_name=Concat( 'last_name', Value(', '), 'first_name', Value(' '), 'middle_name', output_field=CharField(), ), ).filter(full_name=self.name) # limit scope to election_type if it will yield results if last_first_middle_q.filter(election_type=election_data['type']).exists(): q = last_first_middle_q.filter(election_type=election_data['type']) # if not, check if dropping election_type filter yields results elif last_first_middle_q.exists(): q = last_first_middle_q # if not, change name format else: last_first_q = q.annotate( full_name=Concat( 'last_name', Value(', '), 'first_name', output_field=CharField(), ), ).filter(full_name=self.name) # again, limit to election_type at first if last_first_q.filter(election_type=election_data['type']).exists(): q = last_first_q.filter(election_type=election_data['type']) else: q = last_first_q return q
def test_annotation_is_perserved(): class ReporterType(DjangoObjectType): full_name = String() def resolve_full_name(instance, info, **args): return instance.full_name class Meta: model = Reporter interfaces = (Node, ) filter_fields = () class Query(ObjectType): all_reporters = DjangoFilterConnectionField(ReporterType) def resolve_all_reporters(self, info, **args): return Reporter.objects.annotate( full_name=Concat('first_name', Value(' '), 'last_name', output_field=TextField()) ) Reporter.objects.create( first_name='John', last_name='Doe', ) schema = Schema(query=Query) query = ''' query NodeFilteringQuery { allReporters(first: 1) { edges { node { fullName } } } } ''' expected = { 'allReporters': { 'edges': [{ 'node': { 'fullName': 'John Doe', } }] } } result = schema.execute(query) assert not result.errors assert result.data == expected