我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.models.Q。
def make_inactive_productlist_query(queryset): now = timezone.now() # Create a query of things are definitively inactive. Some of the ones # filtered here might be out of stock, but we include that later. inactive_candidates = ( queryset .exclude( Q(active=True) & (Q(deactivate_date=None) | Q(deactivate_date__gte=now))) .values("id") ) inactive_out_of_stock = ( queryset .filter(sale__timestamp__gt=F("start_date")) .annotate(c=Count("sale__id")) .filter(c__gte=F("quantity")) .values("id") ) return ( queryset .filter( Q(id__in=inactive_candidates) | Q(id__in=inactive_out_of_stock)) )
def get_foreign(self, queryset, search, filters): # Filter with search string query = [Q(code__icontains=search), ] for lang in settings.LANGUAGES_DATABASES: query.append(Q(**{"{}__name__icontains".format(lang.lower()): search})) qs = queryset.filter( reduce(operator.or_, query) ) category = filters.get('ProductForm_category', None) if category is None: category = filters.get('ProductFormCreate_category', None) if category is None: category = filters.get('ProductFormCreateCustom_category', None) if category: qs = qs.filter(category__pk=category) return qs[:settings.LIMIT_FOREIGNKEY]
def get_foreign(self, queryset, search, filters): # Filter with search string qsobject = Q(model__icontains=search) for lang in settings.LANGUAGES_DATABASES: qsobject |= Q(**{"{}__name__icontains".format(lang.lower()): search}) qsobject |= Q(**{"{}__description_short__icontains".format(lang.lower()): search}) qsobject |= Q(**{"family__{}__name__icontains".format(lang.lower()): search}) qsobject |= Q(**{"category__{}__name__icontains".format(lang.lower()): search}) qsobject |= Q(**{"subcategory__{}__name__icontains".format(lang.lower()): search}) queryset = queryset.filter(qsobject) return queryset[:settings.LIMIT_FOREIGNKEY] # ###########################################
def make_active_productlist_query(queryset): now = timezone.now() # Create a query for the set of products that MIGHT be active. Might # because they can be out of stock. Which we compute later active_candidates = ( queryset .filter( Q(active=True) & (Q(deactivate_date=None) | Q(deactivate_date__gte=now))) ) # This query selects all the candidates that are out of stock. candidates_out_of_stock = ( active_candidates .filter(sale__timestamp__gt=F("start_date")) .annotate(c=Count("sale__id")) .filter(c__gte=F("quantity")) .values("id") ) # We can now create a query that selects all the candidates which are not # out of stock. return ( active_candidates .exclude( Q(start_date__isnull=False) & Q(id__in=candidates_out_of_stock)))
def youtube_search(request): """ ??? ??? ????? ??? ?? """ context = dict() q = request.GET.get('q') if q: # YouTube????? ???? data = youtube.search(q) for item in data['items']: Video.objects.create_from_search_result(item) re_pattern = ''.join(['(?=.*{})'.format(item) for item in q.split()]) videos = Video.objects.filter( Q(title__iregex=re_pattern) | Q(description__iregex=re_pattern) ) context['videos'] = videos return render(request, 'post/youtube_search.html', context)
def get_messages(request, employee_id): """ Get all messages for employee id --- response_serializer: activities.serializers.MessageSerializer responseMessages: - code: 401 message: Unauthorized. Authentication credentials were not provided. Invalid token. - code: 403 message: Forbidden. - code: 404 message: Not found - code: 500 message: Internal Server Error """ if request.method == 'GET': employee = get_object_or_404(Employee, pk=employee_id) messages = Message.objects.filter( Q(to_user='all') | Q(to_user=employee.location.name) | Q(to_user=employee.username)) paginator = PageNumberPagination() results = paginator.paginate_queryset(messages, request) serializer = MessageSerializer(results, many=True) return paginator.get_paginated_response(serializer.data)
def filter_snp_list(request, query, exclude): snp_list = request.GET.get('snp_list', '') snp_list = snp_list.split('\r\n') if snp_list[0] != '': safe_snp_list = [] for row in snp_list: row = row.split(',') for item in row: safe_snp_list.append(item.strip()) query['variant_id__in'] = safe_snp_list #exclude snp_list exclude_snp_list = request.GET.get('exclude_snp_list', '') exclude_snp_list = exclude_snp_list.split('\r\n') if exclude_snp_list[0] != '': safe_exclude_snp_list = [] for row in exclude_snp_list: row = row.split(',') for item in row: safe_exclude_snp_list.append(item.strip()) exclude['variant_id__in']=safe_exclude_snp_list#args.append(~Q(variant_id__in=safe_snp_list))
def filter_mutation_type(request, args): genotype = request.GET.get('genotype', '') if genotype != '': print('genotype', genotype) args.append(Q(genotype=genotype)) mutation_type = request.GET.get('mutation_type', '') if mutation_type == 'homozygous': # genotypes = ['0/0', './.', '0/1', '1/0', '0/2', '2/0'] args.append(Q(mutation_type='HOM')) elif mutation_type == 'heterozygous': #genotypes = ['0/0', './.', '1/1', '2/1', '1/2', '2/2'] args.append(Q(mutation_type='HET'))
def filter_by_cadd(request, args): cadd = request.GET.get('cadd', '') if cadd != '': cadd_exclude = request.GET.get('cadd_exclude', '') if cadd_exclude == 'on': cadd_flag = True # print('CADD Flag on') else: cadd_flag = False cadd = cadd.split(' - ') cadd_min = float(cadd[0]) cadd_max = float(cadd[1]) # print('CADD', cadd_min, cadd_max) if cadd_flag: args.append(Q(cadd__lte=cadd_max) & Q(cadd__gte=cadd_min)) else: args.append((Q(cadd__lte=cadd_max) & Q(cadd__gte=cadd_min)) | Q(cadd__isnull=True))
def filter_by_mcap(request, args): mcap = request.GET.get('mcap', '') if mcap != '': mcap_exclude = request.GET.get('mcap_exclude', '') if mcap_exclude == 'on': mcap_flag = True # print('CADD Flag on') else: mcap_flag = False mcap = mcap.split(' - ') mcap_min = float(mcap[0]) mcap_max = float(mcap[1]) # print('CADD', cadd_min, cadd_max) if mcap_flag: args.append(Q(mcap_score__lte=mcap_max) & Q(mcap_score__gte=mcap_min)) else: args.append((Q(mcap_score__lte=mcap_max) & Q(mcap_score__gte=mcap_min)) | Q(mcap_score__isnull=True))
def filter_cgd(request, args): cgdmanifestation = request.GET.getlist('cgdmanifestation') # interventions = request.GET.getlist('interventions') conditions = request.GET.getlist('cgd') # print 'FILTER BY CGD' # print manifestations, interventions if len(cgdmanifestation) > 0: if cgdmanifestation[0] != '': #get a list of CGDENTRY cgdentries = CGDEntry.objects.filter(MANIFESTATION_CATEGORIES__in=cgdmanifestation) gene_list = [] for gene in cgdentries: gene_list.append(gene.GENE) args.append(Q(gene__in=gene_list)) if len(conditions) > 0: if conditions[0] != '': cgdentries = CGDEntry.objects.filter(CONDITIONS__in=conditions) gene_list = [] for gene in cgdentries: gene_list.append(gene.GENE) args.append(Q(gene__in=gene_list))
def filter_omim(request, args): omim = request.GET.getlist('omim') print('omim', omim) # print 'FILTER BY CGD' # print manifestations, interventions print('FILTER OMIM') if len(omim) > 0: if omim[0] != '': omimentries = Disease.objects.filter(id__in=omim) gene_list = [] print('omimentries', omimentries) genes = GeneDisease.objects.filter(diseases__in=omimentries) print('omimgenes', genes) for gene in genes: gene_list.append(gene.official_name) args.append(Q(gene__in=gene_list))
def filter_hgmd(request, args): hgmd = request.GET.getlist('hgmd') # print 'omim', omim # print 'FILTER BY CGD' # print manifestations, interventions # print 'FILTER OMIM' if len(hgmd) > 0: if hgmd[0] != '': hgmdentries = HGMDPhenotype.objects.filter(id__in=hgmd) gene_list = [] # print 'hgmdentries', hgmdentries genes = HGMDGene.objects.filter(diseases__in=hgmdentries) # print 'hgmdgenes', genes for gene in genes: gene_list.append(gene.symbol) args.append(Q(gene__in=gene_list))
def filter_sift(request, args): sift = request.GET.get('sift', '') if sift != '': sift_exclude = request.GET.get('sift_exclude', '') if sift_exclude == 'on': sift_flag = True else: sift_flag = False sift_option = request.GET.get('sift_option', '') if sift_option == '<': if sift_flag: args.append(Q(sift__lte=float(sift))) else: args.append(Q(sift__lte=float(sift)) | Q(sift__isnull=True)) elif sift_option == '>': if sift_flag: args.append(Q(sift__gte=float(sift))) else: args.append(Q(sift__gte=float(sift)) | Q(sift__isnull=True)) elif sift_option == '=': if sift_flag: args.append(Q(sift=float(sift))) else: args.append(Q(sift=float(sift)) | Q(sift__isnull=True))
def filter_dbsnp_build(request, args): dbsnp_build = request.GET.get('dbsnp_build', '') if dbsnp_build != '': dbsnp_option = request.GET.get('dbsnp_option', '') if dbsnp_option == '<': # build_list = range(1, int(dbsnp_build)) # T2 = [str(x) for x in build_list] # T2.append('') # query['dbsnp_build__in'] = #T2 args.append(Q(dbsnp_build__lte=int(dbsnp_build)) | Q(dbsnp_build__isnull=True)) elif dbsnp_option == '>': # build_list = range(int(dbsnp_build), 138) # T2 = [str(x) for x in build_list] # T2.append('') # query['dbsnp_build__in'] = T2 args.append(Q(dbsnp_build__gte=int(dbsnp_build)) | Q(dbsnp_build__isnull=True))
def get_genes(summary): genes ={} genes['genes'] = Gene.objects.filter(symbol__in=list(summary['genes'])).values('symbol', 'diseases').prefetch_related('diseases') genes['genes_hgmd'] = HGMDGene.objects.filter(symbol__in=list(summary['genes'])).prefetch_related('diseases') #for each gene name create a query queries = [Q(genes__icontains=value) for value in list(summary['genes'])] if len(queries) > 0: query = queries.pop() for item in queries: query |= item genes['genes_omim'] = GeneDisease.objects.filter(official_name__in=list(summary['genes'])).prefetch_related('diseases').order_by('official_name')#query else: genes['genes_omim'] = [] #get genes at CGD genes['genes_cgd'] = CGDEntry.objects.filter(GENE__in=list(summary['genes'])).prefetch_related('CONDITIONS').order_by('GENE') return genes #This is the most important function
def filter_queryset(self, term, queryset=None): """ Return QuerySet filtered by search_fields matching the passed term. Args: term (str): Search term Returns: QuerySet: Filtered QuerySet """ if queryset is None: queryset = self.get_queryset() search_fields = self.get_search_fields() select = Q() term = term.replace('\t', ' ') term = term.replace('\n', ' ') for t in [t for t in term.split(' ') if not t == '']: select &= reduce(lambda x, y: x | Q(**{y: t}), search_fields, Q(**{search_fields[0]: t})) return queryset.filter(select).distinct()
def get_for_objects(self, queryset): """ Get log entries for the objects in the specified queryset. :param queryset: The queryset to get the log entries for. :type queryset: QuerySet :return: The LogAction objects for the objects in the given queryset. :rtype: QuerySet """ if not isinstance(queryset, QuerySet) or queryset.count() == 0: return self.none() content_type = ContentType.objects.get_for_model(queryset.model) primary_keys = queryset.values_list(queryset.model._meta.pk.name, flat=True) if isinstance(primary_keys[0], integer_types): return self.filter(content_type=content_type).filter(Q(object_id__in=primary_keys)).distinct() else: return self.filter(content_type=content_type).filter(Q(object_pk__in=primary_keys)).distinct()
def search(self, terms): """ Produces an icontains lookup on the question title. """ query = None # Query to search for every search term # Build the query with Q and icontains for term in normalize_query(terms): q = models.Q(text__icontains=term) | models.Q(details__icontains=term) query = q if query is None else query & q return self.filter(query) ########################################################################## ## Questions Manager ##########################################################################
def dispatch(self, request, *args, **kwargs): search = request.GET.get('search') if not search or len(search) < 2: return JsonResponse({'count': 0, 'results': []}) queryset = Member.objects.filter( Q(name__icontains=search) | Q(profile_profile__nick__icontains=search) ) return JsonResponse({ 'count': len(queryset), 'results': [ { 'id': member.pk, 'nick': member.profile_profile.nick, 'name': member.name, } for member in queryset ], })
def group_list(request): """ list user group ????? """ header_title, path1, path2 = '?????', '????', '?????' keyword = request.GET.get('search', '') user_group_list = UserGroup.objects.all().order_by('name') group_id = request.GET.get('id', '') if keyword: user_group_list = user_group_list.filter(Q(name__icontains=keyword) | Q(comment__icontains=keyword)) if group_id: user_group_list = user_group_list.filter(id=int(group_id)) user_group_list, p, user_groups, page_range, current_page, show_first, show_end = pages(user_group_list, request) return my_render('juser/group_list.html', locals(), request)
def user_list(request): user_role = {'SU': u'?????', 'GA': u'????', 'CU': u'????'} header_title, path1, path2 = '????', '????', '????' keyword = request.GET.get('keyword', '') gid = request.GET.get('gid', '') users_list = User.objects.all().order_by('username') if gid: user_group = UserGroup.objects.filter(id=gid) if user_group: user_group = user_group[0] users_list = user_group.user_set.all() if keyword: users_list = users_list.filter(Q(username__icontains=keyword) | Q(name__icontains=keyword)).order_by('username') users_list, p, users, page_range, current_page, show_first, show_end = pages(users_list, request) return my_render('juser/user_list.html', locals(), request)
def group_list(request): """ list asset group ????? """ header_title, path1, path2 = u'?????', u'????', u'?????' keyword = request.GET.get('keyword', '') asset_group_list = AssetGroup.objects.all() group_id = request.GET.get('id') if group_id: asset_group_list = asset_group_list.filter(id=group_id) if keyword: asset_group_list = asset_group_list.filter(Q(name__contains=keyword) | Q(comment__contains=keyword)) asset_group_list, p, asset_groups, page_range, current_page, show_first, show_end = pages(asset_group_list, request) return my_render('jasset/group_list.html', locals(), request)
def perm_rule_list(request): """ list rule page ?????? """ # ???? header_title, path1, path2 = "????", "????", "????" # ?????? rules_list = PermRule.objects.all() rule_id = request.GET.get('id') # TODO: ????? keyword = request.GET.get('search', '') if rule_id: rules_list = rules_list.filter(id=rule_id) if keyword: rules_list = rules_list.filter(Q(name__icontains=keyword)) rules_list, p, rules, page_range, current_page, show_first, show_end = pages(rules_list, request) return my_render('jperm/perm_rule_list.html', locals(), request)
def perm_role_list(request): """ list role page """ # ???? header_title, path1, path2 = "????", "??????", "??????" # ???????? roles_list = PermRole.objects.all() role_id = request.GET.get('id') # TODO: ????? keyword = request.GET.get('search', '') if keyword: roles_list = roles_list.filter(Q(name=keyword)) if role_id: roles_list = roles_list.filter(id=role_id) roles_list, p, roles, page_range, current_page, show_first, show_end = pages(roles_list, request) return my_render('jperm/perm_role_list.html', locals(), request)
def perm_sudo_list(request): """ list sudo commands alias :param request: :return: """ # ???? header_title, path1, path2 = "Sudo??", "????", "????" # ????sudo ???? sudos_list = PermSudo.objects.all() # TODO: ????? keyword = request.GET.get('search', '') if keyword: sudos_list = sudos_list.filter(Q(name=keyword)) sudos_list, p, sudos, page_range, current_page, show_first, show_end = pages(sudos_list, request) return my_render('jperm/perm_sudo_list.html', locals(), request)
def searched(request): global filtered, lname, lcity, ltype bool = False if request.method=="POST": form = SearchForm(request.POST) if form.is_valid(): lname = form.cleaned_data['Name'] lcity = form.cleaned_data['City'] ltype = form.cleaned_data['Type'] bool = True filtered = Location.objects.filter(Q(name=lname) | Q(city=lcity) | Q(locationtype=ltype)) else: filtered = "" else: form = SearchForm() filtered = "" context = { 'filtered': filtered, 'form': form, 'bool': bool, } return render(request, 'auscities/result.html', context, {'form':form})
def author(self, request, author=None): """listing of posts by a specific author""" if not author: # Invalid author filter raise Http404('Invalid Author') posts = self.posts.filter( models.Q(owner__username=author) | models.Q(owner__username=unslugify(author))) return render(request, self.get_template(request), {'self': self, 'posts': self._paginate(request, posts), 'filter_type': 'author', 'filter': author})
def tag(self, request, tag=None): """listing of posts in a specific tag""" if not tag: # Invalid tag filter raise Http404('Invalid Tag') posts = self.posts.filter( models.Q(tags__name=tag) | models.Q(tags__name=unslugify(tag))) return render(request, self.get_template(request), {'self': self, 'posts': self._paginate(request, posts), 'filter_type': 'tag', 'filter': tag})
def filter_folder(self, qs, terms=()): # Source: https://github.com/django/django/blob/1.7.1/django/contrib/admin/options.py#L939-L947 flake8: noqa def construct_search(field_name): if field_name.startswith('^'): return "%s__istartswith" % field_name[1:] elif field_name.startswith('='): return "%s__iexact" % field_name[1:] elif field_name.startswith('@'): return "%s__search" % field_name[1:] else: return "%s__icontains" % field_name for term in terms: filters = models.Q() for filter_ in self.search_fields: filters |= models.Q(**{construct_search(filter_): term}) for filter_ in self.get_owner_filter_lookups(): filters |= models.Q(**{filter_: term}) qs = qs.filter(filters) return qs
def search(request): if request.method == 'GET': search_text = request.GET.get('search_text') groupname = request.session.get('user_group') hasgroup = Group.objects.get(name=groupname) print search_text print hasgroup.name filelist = File.objects.filter(Q(name__icontains=search_text)&(Q(perm__groups=hasgroup)|Q(ispublic="True"))).distinct() # filelist = File.objects.filter((Q(name=search_text)|Q(ispublic="True"))).distinct() listoffiledir=list(filelist) folderlist = {} template = loader.get_template('wxWeb/index2.html') context = Context({ 'filepath': listoffiledir, 'foldlist': folderlist, }) return HttpResponse(template.render(context))
def get_reverse_related_filter(self, obj): """ Complement to get_forward_related_filter(). Return the keyword arguments that when passed to self.related_field.model.object.filter() select all instances of self.related_field.model related through this field to obj. obj is an instance of self.model. """ base_filter = { rh_field.attname: getattr(obj, lh_field.attname) for lh_field, rh_field in self.related_fields } descriptor_filter = self.get_extra_descriptor_filter(obj) base_q = Q(**base_filter) if isinstance(descriptor_filter, dict): return base_q & Q(**descriptor_filter) elif descriptor_filter: return base_q & descriptor_filter return base_q
def _filter_or_exclude(self, negate, *args, **kwargs): ''' Annotate lookups for `filter()` and `exclude()`. Examples: - `title_nl__contains='foo'` will add an annotation for `title_nl` - `title_nl='bar'` will add an annotation for `title_nl` - `title_i18n='foo'` will add an annotation for a coalesce of the current active language, and all items of the fallback chain. - `Q(title_nl__contains='foo') will add an annotation for `title_nl` In all cases, the field part of the field lookup will be changed to use the annotated verion. ''' # handle Q expressions / args new_args = [] for arg in args: new_args.append(self._rewrite_Q(arg)) # handle the kwargs new_kwargs = {} for field, value in kwargs.items(): new_kwargs.update(dict((self._rewrite_filter_clause(field, value), ))) return super(MultilingualQuerySet, self)._filter_or_exclude(negate, *new_args, **new_kwargs)
def create_assignments(cls, proposal, origin=AUTO_ASSIGNED_INITIAL): speakers = [proposal.speaker] + list(proposal.additional_speakers.all()) reviewers = User.objects.exclude( pk__in=[ speaker.user_id for speaker in speakers if speaker.user_id is not None ] ).filter( groups__name="reviewers", ).filter( Q(reviewassignment__opted_out=False) | Q(reviewassignment=None) ).annotate( num_assignments=models.Count("reviewassignment") ).order_by( "num_assignments", ) for reviewer in reviewers[:3]: cls._default_manager.create( proposal=proposal, user=reviewer, origin=origin, )
def validate_email(self): """Ensure emails are unique across the models tracking emails. Since it's essential to keep email addresses unique to support our workflows, a `ValidationError` will be raised if the email trying to be saved is already assigned to some other user. """ lookup = Q(email__iexact=self.email) if self.pk is not None: # When there's an update, ensure no one else has this address lookup &= ~Q(user=self) try: EmailAddress.objects.get(lookup) except EmailAddress.DoesNotExist: pass else: raise ValidationError({ 'email': [_('This email address already exists.')] })
def get_context_data(self, **kwargs): # ?????????kwargs['query_data'], ??template?? kwargs['query_data'] = self.request.GET.dict() region = self.request.GET.get('region') subject = self.request.GET.get('subject') prices = None if region and region.isdigit(): prices = models.Price.objects.filter(region_id=region) if subject and subject.isdigit(): prices = prices and prices.filter(ability__subject_id=subject) all_levels= models.Level.objects.all() all_grades = models.Grade.objects.filter(leaf=True) all_subject = models.Subject.objects.all() self.build_context(kwargs, prices, all_levels, all_grades) kwargs['grade_list'] = all_grades kwargs['subject_list'] = all_subject kwargs['region_list'] = models.Region.objects.filter(Q(opened=True)|Q(name='??')) return super(LevelPriceConfigView, self).get_context_data(**kwargs)
def get_queryset(self, *args, **kwargs): """ Return all variant data, or a subset if a specific list is requested. """ queryset = super(VariantViewSet, self).get_queryset(*args, **kwargs) variant_list_json = self.request.query_params.get('variant_list', None) if not variant_list_json: return queryset variant_list = json.loads(variant_list_json) # Combine the variant list to make a single db query. Q_obj = None for variant_lookup in variant_list: if variant_lookup.isdigit(): filter_kwargs = {'id': variant_lookup} else: filter_kwargs = self._custom_variant_filter_kwargs(variant_lookup) if filter_kwargs: if not Q_obj: Q_obj = Q(**filter_kwargs) else: Q_obj = Q_obj | Q(**filter_kwargs) queryset = queryset.filter(Q_obj) return queryset
def get_limit_choices_to_from_path(model, path): """ Return Q object for limiting choices if applicable. If final model in path is linked via a ForeignKey or ManyToManyField which has a `limit_choices_to` attribute, return it as a Q object. """ fields = get_fields_from_path(model, path) fields = remove_trailing_data_field(fields) limit_choices_to = ( fields and hasattr(fields[-1], 'rel') and getattr(fields[-1].rel, 'limit_choices_to', None)) if not limit_choices_to: return models.Q() # empty Q elif isinstance(limit_choices_to, models.Q): return limit_choices_to # already a Q else: return models.Q(**limit_choices_to) # convert dict to Q
def index(request, page): if not request.user.is_authenticated: return redirect('account:signup') profile = Profile.objects.get(user=request.user) following = Follow.objects.filter(profile=profile).all() followed_topics = [] for topic in following: followed_topics.append(topic.topic) queries = [Q(topic=topic) for topic in followed_topics] if queries: query = queries.pop() for item in queries: query |= item questions = Question.objects.filter(query).order_by('-created_at').all() else: questions = [] paginator = Paginator(questions, 10) questions = paginator.page(page) print(questions) return render(request, "question/index.html", { "questions": questions, "following": followed_topics })
def guest_by_name(self, name, email, confidence=1): if not name: return self.none() normalized_name = normalize(name) attribs = ['name__startswith', 'data__nazwaskrocona'] q_f = reduce(lambda x, y: x | Q(**{y: name}) | Q(**{y: normalized_name}) | Q(**{y: name.upper()}) | Q(**{y: normalized_name.upper()}), attribs, Q()) if not email: return self.filter(q_f) domain = email.split('@', 2)[1].lower() q_f |= Q(data__adresstronyinternetowej=domain) | Q(data__adresstronyinternetowej="www.{}".format(domain)) q_f |= Q(data__adresstronyinternetowej=domain.upper()) | Q(data__adresstronyinternetowej="www.{}".format(domain).upper()) result = list(self.filter(q_f)) if result: return result all_possible = REGON.objects.exclude(~Q(Q(data__adresemail='') & Q(data__adresemail2=""))).iterator() return filter(lambda x: x.data.get('adresemail', '').upper().endswith("@" + domain) | x.data.get('adresemail2', '').upper().endswith("@" + domain), all_possible)
def handle(self, query, outfile, *args, **options): queries = [Q(name__icontains=x) | Q(regon__icontains=x) for x in query] q = reduce(lambda x, y: x | y, queries) f_csv = csv.writer(outfile) f_csv.writerow(['name', 'clean_name', 'regon14', 'regon9', 'region', 'terc']) for regon in REGON.objects.filter(q).select_related('regonjst', 'regonjst__jst').all(): name = regon.name clean_name = normalize(regon.name) regon14 = regon.data.get('regon14', '') regon9 = regon.data.get('regon9', '') if hasattr(regon, 'regonjst'): region = " > ".join(x.name for x in regon.regonjst.jst.get_ancestors(ascending=False, include_self=True)) teryt = regon.regonjst.jst_id else: region = '' teryt = '' f_csv.writerow((name, clean_name, regon14, regon9, region, teryt))
def search(request): """ ???? ???? 2017?6?17?16:11:15 :param request: :return: """ q = request.GET.get('q') error_msg = '' if not q: error_msg = '??????' return render(request, 'blog/index.html', {'error_msg': error_msg}) post_list = Post.objects.filter(Q(title__icontains=q) | Q(body__icontains=q)).filter(is_pub=True).filter( category__is_pub=True).order_by( '-create_time') return render(request, 'blog/index.html', {'post_list': post_list, 'error_msg': error_msg})
def handle_one_run(self): now = timezone.now() month_before = now - timedelta(days=30) report_due = Q(next_report_date__lt=now) report_not_scheduled = Q(next_report_date__isnull=True) q = Profile.objects.filter(report_due | report_not_scheduled) q = q.filter(reports_allowed=True) q = q.filter(user__date_joined__lt=month_before) sent = 0 for profile in q: if num_pinged_checks(profile) > 0: self.stdout.write(self.tmpl % profile.user.email) profile.send_report() sent += 1 return sent
def get_base_wf_permit_query_param(user, process_instance_field_prefix='pinstance__'): def p(param_name, value): return {process_instance_field_prefix + param_name: value} q_param = Q() # Submit q_param = q_param | Q( **p('created_by', user) ) # share q_param = q_param | Q( **p('can_view_users', user) ) # Can process q_param = q_param | Q( **p('task__user', user) ) q_param = q_param | Q( **p('task__agent_user', user) ) return q_param
def serialize(cls, user, program): """ Serializes financial aid info for a user in a program """ if not program.financial_aid_availability: return {} serialized = copy.copy(cls.default_serialized) financial_aid_qset = FinancialAid.objects.filter( Q(user=user) & Q(tier_program__program=program) ).exclude(status=FinancialAidStatus.RESET) serialized["has_user_applied"] = financial_aid_qset.exists() if serialized["has_user_applied"]: financial_aid = financial_aid_qset.first() serialized.update({ "application_status": financial_aid.status, "date_documents_sent": financial_aid.date_documents_sent, "id": financial_aid.id }) financial_aid_min_price, financial_aid_max_price = cls.get_program_price_range(program) serialized.update({ "min_possible_cost": financial_aid_min_price, "max_possible_cost": financial_aid_max_price }) return serialized
def get_program_price_range(cls, program): """ Returns the financial aid possible cost range """ course_max_price = program.price # get all the possible discounts for the program program_tiers_qset = TierProgram.objects.filter( Q(program=program) & Q(current=True)).order_by('discount_amount') if not program_tiers_qset.exists(): log.error('The program "%s" needs at least one tier configured', program.title) raise ImproperlyConfigured( 'The program "{}" needs at least one tier configured'.format(program.title)) min_discount = program_tiers_qset.aggregate( Min('discount_amount')).get('discount_amount__min', 0) max_discount = program_tiers_qset.aggregate( Max('discount_amount')).get('discount_amount__max', 0) return course_max_price - max_discount, course_max_price - min_discount
def get_searchable_programs(user, staff_program_ids): """ Determines the programs a user is eligible to search Args: user (django.contrib.auth.models.User): the user that is searching staff_program_ids (list of int): the list of program ids the user is staff for if any Returns: set(courses.models.Program): set of programs the user can search in """ # filter only to the staff programs or enrolled programs # NOTE: this has an accepted limitation that if you are staff on any program, # you can't use search on non-staff programs return set(Program.objects.filter( Query(id__in=staff_program_ids) if staff_program_ids else Query(programenrollment__user=user) ).distinct())
def visible_for_user(self, user): if not user.is_authenticated: return self.filter(visibility=Visibility.PUBLIC) if user.is_staff: return self return self.filter(Q(id=user.profile.id) | Q(visibility__in=[ Visibility.LIMITED, Visibility.SITE, Visibility.PUBLIC ]))