我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.db.models.query_utils.Q。
def delete_batch(self, pk_list, using, field=None): """ Set up and execute delete queries for all the objects in pk_list. More than one physical query may be executed if there are a lot of values in pk_list. """ # number of objects deleted num_deleted = 0 if not field: field = self.get_meta().pk for offset in range(0, len(pk_list), GET_ITERATOR_CHUNK_SIZE): self.where = self.where_class() self.add_q(Q( **{field.attname + '__in': pk_list[offset:offset + GET_ITERATOR_CHUNK_SIZE]})) num_deleted += self.do_query(self.get_meta().db_table, self.where, using=using) return num_deleted
def test_populate(self): from chroma_core.services.plugin_runner.resource_manager import EdgeIndex resource_record, couplet_resource = self._make_global_resource('example_plugin', 'Couplet', {'address_1': 'foo', 'address_2': 'bar'}) controller_resource = self._make_local_resource('example_plugin', 'Controller', index = 0, parents = [couplet_resource]) self.resource_manager.session_open(self.plugin, resource_record.pk, [couplet_resource, controller_resource], 60) # By not fetching the Couple and not fetching the plugin we should be left with 1 entry, this will raise an exception if the # result is not 1 entry. controller_record = StorageResourceRecord.objects.get(~Q(id = resource_record.pk), ~Q(id = self.plugin._scannable_id)) index = EdgeIndex() index.populate() self.assertEqual(index.get_parents(controller_record.pk), [resource_record.pk]) self.assertEqual(index.get_children(resource_record.pk), [controller_record.pk])
def get_usable_luns(cls, queryset): """ Get all Luns which are not used by Targets and have enough VolumeNode configuration to be used as a Target (i.e. have only one node or at least have a primary node set) Luns are usable if they have only one VolumeNode (i.e. no HA available but we can definitively say where it should be mounted) or if they have a primary VolumeNode (i.e. one or more VolumeNodes is available and we know at least where the primary mount should be) """ queryset = cls.get_unused_luns(queryset)\ .filter(volumenode__host__not_deleted=True)\ .annotate(has_primary=BoolOr('volumenode__primary'), num_volumenodes=Count('volumenode'))\ .filter(Q(num_volumenodes=1) | Q(has_primary=True)) return queryset
def test_lock_already_locked_resource(self): """Lock an already locked resource & validate failure. * Validates the DB initial state. * Locks an already locked resource, using resource client. * Validates a ResourceUnavailableError is raised. """ resources_num = DemoResourceData.objects.filter(~Q(owner=""), name=self.LOCKED1_NAME).count() self.assertEquals(resources_num, 1, "Expected 1 locked " "resource with name %r in DB found %d" % (self.LOCKED1_NAME, resources_num)) descriptor = Descriptor(DemoResource, name=self.LOCKED1_NAME) self.assertRaises(ResourceUnavailableError, self.client._lock_resources, descriptors=[descriptor], timeout=self.LOCK_TIMEOUT)
def query_resources(self, request): """Find and return the resources that answer the client's query. Args: request (Request): QueryResources request. Returns: ResourcesReply. a reply containing matching resources. """ desc = ResourceDescriptor.decode(request.message.descriptors) self.logger.debug("Looking for resources with description %r", desc) # query for resources that are usable and match the descriptors query = (Q(is_usable=True, **desc.properties)) matches = desc.type.objects.filter(query) if matches.count() == 0: raise ResourceDoesNotExistError("No existing resource meets " "the requirements: %r" % desc) query_result = [resource for resource in matches] return ResourcesReply(resources=query_result)
def complex_filter(self, filter_obj): """ Returns a new QuerySet instance with filter_obj added to the filters. filter_obj can be a Q object (or anything with an add_to_query() method) or a dictionary of keyword lookup arguments. This exists to support framework features such as 'limit_choices_to', and usually it will be more natural to use other methods. """ if isinstance(filter_obj, Q) or hasattr(filter_obj, 'add_to_query'): clone = self._clone() clone.query.add_q(filter_obj) return clone else: return self._filter_or_exclude(None, **filter_obj)
def delete_batch(self, pk_list, using): """ Set up and execute delete queries for all the objects in pk_list. More than one physical query may be executed if there are a lot of values in pk_list. """ # number of objects deleted num_deleted = 0 field = self.get_meta().pk for offset in range(0, len(pk_list), GET_ITERATOR_CHUNK_SIZE): self.where = self.where_class() self.add_q(Q( **{field.attname + '__in': pk_list[offset:offset + GET_ITERATOR_CHUNK_SIZE]})) num_deleted += self.do_query(self.get_meta().db_table, self.where, using=using) return num_deleted
def _remove_registry(self, name: Optional[str]=None, client_id: Optional[str]=None, force: bool=False) -> None: query = Q() if name: query |= Q(name__exact=name) if client_id: query |= Q(client_id__exact=client_id) registry = DockerRegistry.objects.filter(query) if registry.count() == 0: self.stderr.write("No matching registry found for the given criteria.") sys.exit(1) elif registry.count() > 1: self.stderr.write("Criteria matched more than a single registry.") sys.exit(1) else: self.stdout.write("\nRegistry-----------\nName: %s\nClient id: %s\n\n" % (registry[0].name, registry[0].client_id)) if force or self._ask_confirmation("Really delete the above registry? [yN]", default=False): regname = registry[0].name registry.delete() self.stderr.write(self.style.SUCCESS("Removed docker registry \"%s\"." % regname)) return else: sys.exit(1)
def filter_list_queryset(self, request, queryset, view): if request.user.is_authenticated(): if request.user.is_staff or request.user.is_superuser: queryset = queryset.filter( Q(channeluser__user=request.user) | Q(type=CHANNEL_TYPE_SUPPORT) | Q(type=CHANNEL_TYPE_DEVELOPER) ) elif request.user.is_developer: queryset = queryset.filter( Q(channeluser__user=request.user) | Q(type=CHANNEL_TYPE_DEVELOPER) ) else: queryset = queryset.filter(channeluser__user=request.user) if not request.query_params.get('type', None): queryset = queryset.exclude(type=CHANNEL_TYPE_SUPPORT) return queryset return queryset.none()
def filter_payment_status(self, queryset, name, value): queryset = queryset.filter(closed=True) if value in ['paid', 'processing']: request = self.request is_po = request and request.user and request.user.is_authenticated() and request.user.is_project_owner and not request.user.is_admin if value == 'paid': return is_po and queryset or queryset.filter(paid=True, pay_distributed=True) else: processing_filter = (Q(processing=True) & Q(paid=False)) if not is_po: processing_filter = processing_filter | (Q(paid=True) & Q(pay_distributed=False)) return queryset.filter(processing_filter) elif value == 'pending': queryset = queryset.filter(processing=False, paid=False) elif value == 'distribute': queryset = queryset.filter( payment_method=TASK_PAYMENT_METHOD_STRIPE, paid=True, btc_paid=False, pay_distributed=False ) return queryset
def filter_list_queryset(self, request, queryset, view): label_filter = request.query_params.get('filter', None) threshold_date = datetime.datetime.utcnow() - relativedelta(hours=24) if label_filter == 'upcoming': queryset = queryset.filter( due_at__gt=threshold_date, progressreport__isnull=True ) elif label_filter in ['complete', 'finished']: queryset = queryset.filter( progressreport__isnull=False ) elif label_filter == 'missed': queryset = queryset.filter( due_at__lt=threshold_date, progressreport__isnull=True ) if request.user.is_staff or request.user.is_superuser: return queryset return queryset.filter( Q(created_by=request.user) | Q(task__user=request.user) | ( Q(task__participation__user=request.user) & Q(task__participation__status__in=[STATUS_INITIAL, STATUS_ACCEPTED]) ) )
def has_object_read_permission(self, request): if str(self.edit_token) == get_edit_token_header(request) or request.user == self.user or \ (self.parent and request.user == self.parent.user) or \ self.has_admin_access(request.user) or \ (request.user.is_authenticated() and request.user.is_project_manager): #and (self.pm == request.user or not self.pm)): return True elif self.visibility == VISIBILITY_DEVELOPER: return request.user.is_authenticated() and request.user.is_developer elif self.visibility == VISIBILITY_MY_TEAM: return bool( Connection.objects.exclude(status=STATUS_REJECTED).filter( Q(from_user=self.user, to_user=request.user) | Q(from_user=request.user, to_user=self.user) ).count() ) elif self.visibility == VISIBILITY_CUSTOM: return self.subtask_participants_inclusive_filter.filter( user=request.user, status__in=[STATUS_INITIAL, STATUS_ACCEPTED] ).count() return False
def for_user(self, user): return self.filter(Q(assigned_to_group__isnull=True) | Q(assigned_to_group__in=user.groups.all()), Q(assigned_to_user__isnull=True) | Q(assigned_to_user=user))
def delete_qs(self, query, using): """ Delete the queryset in one SQL query (if possible). For simple queries this is done by copying the query.query.where to self.query, for complex queries by using subquery. """ innerq = query.query # Make sure the inner query has at least one table in use. innerq.get_initial_alias() # The same for our new query. self.get_initial_alias() innerq_used_tables = [t for t in innerq.tables if innerq.alias_refcount[t]] if not innerq_used_tables or innerq_used_tables == self.tables: # There is only the base table in use in the query. self.where = innerq.where else: pk = query.model._meta.pk if not connections[using].features.update_can_self_select: # We can't do the delete using subquery. values = list(query.values_list('pk', flat=True)) if not values: return 0 return self.delete_batch(values, using) else: innerq.clear_select_clause() innerq.select = [ pk.get_col(self.get_initial_alias()) ] values = innerq self.where = self.where_class() self.add_q(Q(pk__in=values)) cursor = self.get_compiler(using).execute_sql(CURSOR) return cursor.rowcount if cursor else 0
def update_batch(self, pk_list, values, using): self.add_update_values(values) for offset in range(0, len(pk_list), GET_ITERATOR_CHUNK_SIZE): self.where = self.where_class() self.add_q(Q(pk__in=pk_list[offset: offset + GET_ITERATOR_CHUNK_SIZE])) self.get_compiler(using).execute_sql(NO_RESULTS)
def __init__(self, condition=None, then=None, **lookups): if lookups and condition is None: condition, lookups = Q(**lookups), None if condition is None or not isinstance(condition, Q) or lookups: raise TypeError("__init__() takes either a Q object or lookups as keyword arguments") super(When, self).__init__(output_field=None) self.condition = condition self.result = self._parse_expressions(then)[0]
def deployments(self): return self.filter(Q(action_type='Equipment deployment') | Q(action_type='Instrument deployment'))
def get_query(self, q, request): return self.model.objects.filter( Q(translations__title__icontains=q ) | Q(translations__lead_in__icontains=q) ).order_by('translations__slug')[:20]
def get_query(self, q, request): return self.model.objects.filter( Q(translations__name__icontains=q ) | Q(user__first_name__icontains=q ) | Q(user__last_name__icontains=q) ).order_by('translations__name')[:20]
def get_query(self, q, request): return self.model.objects.filter( Q(translations__name__icontains=q ) | Q(translations__description__icontains=q)).order_by( 'translations__name')[:20]
def save(self, *args, **kwargs): try: ManagedHost.objects.get(~Q(pk = self.pk), fqdn = self.fqdn) raise IntegrityError("FQDN %s in use" % self.fqdn) except ManagedHost.DoesNotExist: pass super(ManagedHost, self).save(*args, **kwargs)
def populate(self): for srr in StorageResourceRecord.objects.filter(~Q(parents = None)).values('id', 'parents'): child = srr['id'] parent = srr['parents'] self.add_parent(child, parent)
def attempts(request, contest_id): contest = get_object_or_404(models.TaskBasedContest, pk=contest_id) attempts = contest.attempts.order_by('-created_at').select_related( 'task', 'participant', 'participant__teamparticipant', 'participant__individualparticipant', 'author' ) form = forms.AttemptsSearchForm(data=request.GET) if form.is_valid(): pattern = form.cleaned_data['pattern'] if pattern != '': attempts = attempts.filter(Q(task__name__icontains=pattern) | Q(author__username__icontains=pattern) | Q(author__first_name__icontains=pattern) | Q(author__last_name__icontains=pattern) | Q(participant__teamparticipant__team__name__icontains=pattern) | Q(answer__icontains=pattern)) return render(request, 'contests/attempts.html', { 'current_contest': contest, 'contest': contest, 'pattern': pattern, 'attempts': attempts, 'form': form, })
def get_open_tasks(self, participant): return ManualOpenedTask.objects.filter( contest=self.contest ).filter( Q(participant__isnull=True) | Q(participant=participant) ).values_list('task_id', flat=True)
def published(self, for_user=UNSET, force_exchange=False): """ Apply additional filtering of published items over that done in `PublishingQuerySet.published` to filter based on additional publising date fields used by Fluent. """ if for_user is not UNSET: return self.visible() queryset = super(PublishingUrlNodeQuerySet, self).published( for_user=for_user, force_exchange=force_exchange) # Exclude by publication date on the published version of items, *not* # the draft vesion, or we could get the wrong result. # Exclude fields of published copy of draft items, not draft itself... queryset = queryset.exclude( Q(publishing_is_draft=True) & Q( Q(publishing_linked__publication_date__gt=now()) | Q(publishing_linked__publication_end_date__lte=now()))) # ...and exclude fields directly on published items queryset = queryset.exclude( Q(publishing_is_draft=False) & Q( Q(publication_date__gt=now()) | Q(publication_end_date__lte=now()))) return queryset
def published(self, for_user=None, force_exchange=True): """ Customise `UrlNodeQuerySet.published()` to add filtering by publication date constraints and exchange of draft items for published ones. """ qs = self._single_site() # Avoid filtering to only published items when we are in a draft # context and we know this method is triggered by Fluent (because # the `for_user` is present) because we may actually want to find # and return draft items to priveleged users in this situation. if for_user and is_draft_request_context(): return qs if for_user is not None and for_user.is_staff: pass # Don't filter by publication date for Staff else: qs = qs.filter( Q(publication_date__isnull=True) | Q(publication_date__lt=now()) ).filter( Q(publication_end_date__isnull=True) | Q(publication_end_date__gte=now()) ) if force_exchange: return _exchange_for_published(qs) else: return qs.filter(status=UrlNode.PUBLISHED)
def get_context_data(self, **kwargs): context = super(EditarPermissoesUsuarioView, self).get_context_data(**kwargs) user = User.objects.get(pk=self.kwargs['pk']) context['user'] = user condition = reduce(operator.or_, [Q(codename__icontains=s) for s in [ 'add_', 'change_', 'view_', 'delete_']]) context['default_permissions'] = Permission.objects.filter( condition, content_type__model__in=DEFAULT_PERMISSION_MODELS) context['custom_permissions'] = Permission.objects.filter( codename__in=CUSTOM_PERMISSIONS) return context
def search_keyword(request): try: # request.POST.get('') word = request.POST.get('word') courses = Course.objects.filter(Q(search_keywords__course__name__icontains=word) | Q(name__icontains=word)).distinct().values("id","name") # courses = Course.objects.all().values("id","name") # career_courses = CareerCourse.objects.filter(search_keywords__contains=word).values("id","name") except Exception: pass result_course = [x for x in courses] keywords = {"keywords":result_course,} return HttpResponse(json.dumps(keywords))
def get_q(self, qualifier, value, invert, partial=''): self.check_qualifier(qualifier) # TODO: Try to make the splitting and cleaning more re-usable if qualifier in ('in', 'range'): values = value.split(',') if qualifier == 'range': if len(values) != 2: raise BinderRequestError('Range requires exactly 2 values for {}.'.format(self.field_description())) else: values = [value] if qualifier == 'isnull': cleaned_value = True elif qualifier in ('in', 'range'): cleaned_value = [self.clean_value(qualifier, v) for v in values] else: try: cleaned_value = self.clean_value(qualifier, values[0]) except IndexError: raise ValidationError('Value for filter {{{}}}.{{{}}} may not be empty.'.format(self.field.model.__name__, self.field.name)) suffix = '__' + qualifier if qualifier else '' if invert: return ~Q(**{partial + self.field.name + suffix: cleaned_value}) else: return Q(**{partial + self.field.name + suffix: cleaned_value})
def test_lock_unavailable_resource_timeout(self): """Lock an already locked resource & validate failure after timeout. * Validates the DB initial state. * Locks an already locked resource, using resource client. * Validates a ResourceUnavailableError is raised. * Validates 'lock_resources' duration is greater then the timeout. """ resources_num = DemoResourceData.objects.filter(~Q(owner=""), name=self.LOCKED1_NAME).count() self.assertEquals(resources_num, 1, "Expected 1 locked " "resource with name %r in DB found %d" % (self.LOCKED1_NAME, resources_num)) descriptor = Descriptor(DemoResource, name=self.LOCKED1_NAME) start_time = time.time() self.assertRaises(ResourceUnavailableError, self.client._lock_resources, descriptors=[descriptor], timeout=self.LOCK_TIMEOUT) duration = time.time() - start_time self.assertGreaterEqual(duration, self.LOCK_TIMEOUT, "Waiting for " "resources took %.2f seconds, but should take " "at least %d" % (duration, self.LOCK_TIMEOUT))
def test_lock_multiple_matches(self): """Lock a resource, parameters matching more then one result. * Validates the DB initial state. * Locks a resource using parameters that match more than one resource, using resource client. * Validates only one resource returned. * Validates the returned resource is now marked as locked. * Validates there is still 1 available resource with same parameters. """ common_parameters = {'ip_address': "1.1.1.1"} resources_num = DemoResourceData.objects.filter(owner="", **common_parameters).count() self.assertEquals(resources_num, 2, "Expected 2 available " "resources with parameters %r in DB found %d" % (common_parameters, resources_num)) descriptor = Descriptor(DemoResource, **common_parameters) resources = self.client._lock_resources(descriptors=[descriptor], timeout=self.LOCK_TIMEOUT) resources_num = len(resources) self.assertEquals(resources_num, 1, "Expected list with 1 " "resource in it but found %d" % resources_num) locked_resource_name = resources[0].name resources_num = descriptor.type.DATA_CLASS.objects.filter(~Q(owner=""), name=locked_resource_name).count() self.assertEquals(resources_num, 1, "Expected 1 locked " "resource with name %r in DB, found %d" % (locked_resource_name, resources_num)) resources_num = descriptor.type.DATA_CLASS.objects.filter(owner="", **common_parameters).count() self.assertGreaterEqual(resources_num, 1, "Expected at least 1 " "available resource with the same parameters " "in DB found %d" % resources_num)
def get_copy_languages(self, placeholder, model, fieldname, **kwargs): manager = model.objects src = manager.get(**{fieldname: placeholder}) query = Q(master=src.master) query &= Q(**{'%s__cmsplugin__isnull' % fieldname: False}) query &= ~Q(pk=src.pk) language_codes = manager.filter(query).values_list('language_code', flat=True).distinct() return [(lc, dict(settings.LANGUAGES)[lc]) for lc in language_codes]
def _filter_or_exclude(self, negate, *args, **kwargs): if args or kwargs: assert self.query.can_filter(), \ "Cannot filter a query once a slice has been taken." clone = self._clone() if negate: clone.query.add_q(~Q(*args, **kwargs)) else: clone.query.add_q(Q(*args, **kwargs)) return clone