我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用django.core.exceptions.MultipleObjectsReturned()。
def add_tag(ticket_id, body, user): """ Add ticket tag """ try: tag = Tag.objects.get(**body) ticket = Ticket.objects.get(id=ticket_id) if ticket.__class__.__name__ != tag.tagType: raise BadRequest('Invalid tag for ticket') ticket.tags.add(tag) ticket.save() database.log_action_on_ticket( ticket=ticket, action='add_tag', user=user, tag_name=tag.name ) except MultipleObjectsReturned: raise BadRequest('Please use tag id') except (KeyError, FieldError, IntegrityError, ObjectDoesNotExist, ValueError): raise NotFound('Tag or ticket not found') return {'message': 'Tag successfully added'}
def add_tag(report_id, body): """ Add report tag """ try: tag = Tag.objects.get(**body) report = Report.objects.get(id=report_id) if report.__class__.__name__ != tag.tagType: raise BadRequest('Invalid tag for report') report.tags.add(tag) report.save() except MultipleObjectsReturned: raise BadRequest('Please use tag id') except (KeyError, FieldError, IntegrityError, ObjectDoesNotExist, ValueError): raise NotFound('Report or tag not found') return {'message': 'Tag successfully added'}
def get(self, request, course_id, lesson_id, *args, **kwargs): quiz_data = {} template_dict = {} lesson = get_object_or_404(Lesson, id=lesson_id) # check if quiz for this lesson exists try: get_object_or_404( PracticalTest, theoretical_part=lesson ) except MultipleObjectsReturned: pass questions = PracticalTest.objects.filter(theoretical_part=lesson) for quest in questions: quiz_data[quest.question] = AnswerOption.objects.filter( practical_test=quest ) template_dict['form'] = QuizForm(extra=quiz_data) return render(request, self.template_name, template_dict)
def get(self, request, *args, **kwargs): try: key = request.query_params['key'] user = MomoUser.objects.get(hash_username=key) user.is_active = True user.save() except MomoUser.DoesNotExist: raise Http404 except MultipleObjectsReturned: raise ValidationError(detail="?? ?? url? ?? ?????.") except MultiValueDictKeyError: raise ValidationError(detail="?? ?? url? ?? ?????.") return Response({"user_pk": user.pk, "detail": "user? ????????.", "url": reverse('index', request=request)}, status=status.HTTP_200_OK, template_name='member/activate.html')
def get_detail(self, request, **kwargs): """ Returns a single serialized resource. Calls ``cached_obj_get/obj_get`` to provide the data, then handles that result set and serializes it. Should return a HttpResponse (200 OK). """ basic_bundle = self.build_bundle(request=request) try: obj = self.cached_obj_get(bundle=basic_bundle, **self.remove_api_resource_names(kwargs)) except ObjectDoesNotExist: return http.HttpNotFound() except MultipleObjectsReturned: return http.HttpMultipleChoices("More than one resource is found at this URI.") bundle = self.build_bundle(obj=obj, request=request) bundle = self.full_dehydrate(bundle) bundle = self.alter_detail_data_to_serialize(request, bundle) return self.create_response(request, bundle)
def obj_get(self, bundle, **kwargs): """ A ORM-specific implementation of ``obj_get``. Takes optional ``kwargs``, which are used to narrow the query to find the instance. """ try: object_list = self.get_object_list(bundle.request).filter(**kwargs) stringified_kwargs = ', '.join(["%s=%s" % (k, v) for k, v in kwargs.items()]) if len(object_list) <= 0: raise self._meta.object_class.DoesNotExist("Couldn't find an instance of '%s' which matched '%s'." % (self._meta.object_class.__name__, stringified_kwargs)) elif len(object_list) > 1: raise MultipleObjectsReturned("More than '%s' matched '%s'." % (self._meta.object_class.__name__, stringified_kwargs)) bundle.obj = object_list[0] self.authorized_read_detail(object_list, bundle) return bundle.obj except ValueError: raise NotFound("Invalid resource lookup data provided (mismatched type).")
def test_get_one_does_not_iterate_long_sequence_indefinitely(self): # Avoid typical performance pitfall of retrieving all objects. # In rare failure cases, there may be large numbers. Fail fast. class InfinityException(Exception): """Iteration went on indefinitely.""" def infinite_sequence(): """Generator: count to infinity (more or less), then fail.""" for counter in range(3): yield counter raise InfinityException() # Raises MultipleObjectsReturned as spec'ed. It does not # iterate to infinity first! self.assertRaises( MultipleObjectsReturned, get_one, infinite_sequence())
def delete_model(self, request, obj): # Retrieve the bin page or create it try: p = Page.objects.get(title_set__title=BIN_PAGE_NAME) except ObjectDoesNotExist: p = api.create_page(BIN_PAGE_NAME, constants.TEMPLATE_INHERITANCE_MAGIC, BIN_PAGE_LANGUAGE) except MultipleObjectsReturned: p = Page.objects.filter(title_set__title=BIN_PAGE_NAME).first() # is the page already under the ~BIN folder? is_in_bin = False q = obj while q: if q.title_set.filter(title=BIN_PAGE_NAME).count() > 0: is_in_bin = True break q = q.parent # if yes -> delete it if is_in_bin: obj.delete() p.fix_tree() return # else -> move it to the bin folder # split the contents of the bin into buckets (too many children will slow the javascript down bucket_title = datetime.datetime.now().strftime(BIN_BUCKET_NAMING) try: bucket = Page.objects.get(title_set__title=bucket_title) except ObjectDoesNotExist: bucket = api.create_page(bucket_title, constants.TEMPLATE_INHERITANCE_MAGIC, BIN_PAGE_LANGUAGE, parent=p) obj.move_page(bucket) p.fix_tree() obj.fix_tree() bucket.fix_tree()
def get_tree(self, request): """ Get html for the descendants (only) of given page or if no page_id is provided, all the root nodes. Used for lazy loading pages in cms.pagetree.js Permission checks is done in admin_utils.get_admin_menu_item_context which is called by admin_utils.render_admin_menu_item. """ page_id = request.GET.get('pageId', None) site_id = request.GET.get('site', None) try: site_id = int(site_id) site = Site.objects.get(id=site_id) except (TypeError, ValueError, MultipleObjectsReturned, ObjectDoesNotExist): site = get_current_site(request) if page_id: page = get_object_or_404(self.model, pk=int(page_id)) pages = page.get_children() else: pages = Page.get_root_nodes().filter(site=site, publisher_is_draft=True)#\ #.exclude(title_set__title__startswith='X') pages = ( pages .select_related('parent', 'publisher_public', 'site') .prefetch_related('children') ) response = render_admin_rows(request, pages, site=site, filtered=False) return HttpResponse(response)
def test_get_social_auth(self): """ Tests that get_social_auth returns a user's edX social auth object, and if multiple edX social auth objects exists, it raises an exception """ assert get_social_auth(self.user) == self.user.social_auth.get(provider=EdxOrgOAuth2.name) UserSocialAuthFactory.create(user=self.user, uid='other name') with self.assertRaises(MultipleObjectsReturned): get_social_auth(self.user)
def get_or_create_service(service_infos): """ Create service or get it if exists """ valid_infos = {} for key, value in service_infos.iteritems(): if key in SERVICE_FIELDS: valid_infos[key] = value try: service, _ = Service.objects.get_or_create(**valid_infos) except MultipleObjectsReturned: service = Service.objects.filter(name=valid_infos['name'])[0] return service
def get(self, **kwargs): """ Similar to the Django ORM's QuerySet.get :param kwargs: it supports the same kwargs as filter does """ params = self.params.copy() params.update(kwargs) params = self._preprocess_filter_params(params) pk = self.model._primary_key() if set(params) - {'select_related'} == {pk}: url = self.model._resource_url(params[pk]) del params[pk] if params: result = self.model._rest_call(url, params=params) else: result = self.model._rest_call(url) else: url = self.model._resources_url() params['limit'] = 1 json_ = self.model._rest_call(url, params=params) count = json_['count'] if count == 0: raise self.model.DoesNotExist( "%s matching query does not exist." % self.model.__name__ ) if count >= 2: raise MultipleObjectsReturned( "get() returned more than one %s -- it returned %d!" % (self.model.__name__, count) ) result = json_['results'][0] obj = self.model(**result) obj._persisted = True return obj
def get(self, **kwargs): cloned = self.filter(**kwargs) result = cloned.get_result() if len(result) > 1: raise MultipleObjectsReturned('get() returned more than one result, it returned {}'.format(cloned.count())) return result[0]
def test_get_call(self): fake_response = MagicMock(json=lambda:{'count': 10, 'results': range(10)}) with patch('rest_framework_queryset.queryset.requests.get', return_value=fake_response) as mock_get: qs = RestFrameworkQuerySet('/api/') with self.assertRaises(MultipleObjectsReturned): qs1 = qs.get(a=123) self.assertEqual(list(qs), range(10)) mock_get.assert_any_call('/api/', params={'a': 123})
def _create_item_transfers(self, sum_item_amounts, error_items=[]): """ Create ItemTransfers to alter inventory amounts """ transfers = [] for item, amount in sum_item_amounts.items(): try: amount_symbol = '{:~}'.format(amount['amount']).split(' ')[1] measure = AmountMeasure.objects.get(symbol=amount_symbol) amount['amount'] = amount['amount'].magnitude except: measure = AmountMeasure.objects.get(symbol='item') # Look up to see if there is a matching ItemTransfer already and then # use this instead try: transfer = ItemTransfer.objects.get(item=item, barcode=amount.get('barcode', None), coordinates=amount.get('coordinates', None)) # At this point need to subtract amount from available in existing # transfer! Need to mark in some way not completed though so can put # back if the trasnfer is cancelled transfer.amount_to_take = amount['amount'] except (ObjectDoesNotExist, MultipleObjectsReturned): transfer = ItemTransfer( item=item, barcode=amount.get('barcode', None), coordinates=amount.get('coordinates', None), amount_taken=amount['amount'], amount_measure=measure) if item in error_items: transfer.is_valid = False transfers.append(transfer) return transfers
def get_object_or_403(model, **kwargs): try: return model.objects.get(**kwargs) except ObjectDoesNotExist: raise PermissionDenied except MultipleObjectsReturned: raise PermissionDenied
def clean_email(self): email = self.cleaned_data.get("email", "") try: auth.models.User.objects.get(email__iexact=email) raise forms.ValidationError(_("A user using that email address already exists.")) except MultipleObjectsReturned: raise forms.ValidationError(_("A user using that email address already exists.")) except auth.models.User.DoesNotExist: # @UndefinedVariable pass return email
def post(self, request): """ ????json api?? --- request_serializer: UserRegisterSerializer """ serializer = UserRegisterSerializer(data=request.data) if serializer.is_valid(): data = serializer.data captcha = Captcha(request) if not captcha.check(data["captcha"]): return error_response(u"?????") try: User.objects.get(username=data["username"]) return error_response(u"??????") except User.DoesNotExist: pass try: User.objects.get(email=data["email"]) return error_response(u"??????????????????") # ?????????????? except MultipleObjectsReturned: return error_response(u"??????????????????") except User.DoesNotExist: user = User.objects.create(username=data["username"], real_name=data["real_name"], email=data["email"]) user.set_password(data["password"]) user.save() UserProfile.objects.create(user=user, school=data["school"], student_id=data["student_id"]) return success_response(u"?????") else: return serializer_invalid_response(serializer)
def clean(self): cleaned_data = super(L2TraceForm, self).clean() host_from = cleaned_data.get('host_from') host_to = cleaned_data.get('host_to') self.l2tracer = L2TraceQuery(host_from, host_to) try: self.l2tracer.trace() except MultipleObjectsReturned: msg = u"Input was ambiguous, matching multiple hosts" raise forms.ValidationError(msg) return cleaned_data
def subscribe(request): """ General entry point for subscribers to enter drip """ if request.method == 'POST': form = DripSubscriberForm(request.POST) if form.is_valid(): try: drip_subscriber = DripSubscriber.objects.get( email=form.cleaned_data["email"].lower() ) messages.info(request, "subscription-exists") return HttpResponseRedirect(reverse_lazy('index')) except MultipleObjectsReturned: messages.info(request, "subscription-exists") return HttpResponseRedirect(reverse_lazy('index')) except DripSubscriber.DoesNotExist: drip_subscriber= DripSubscriber.objects.create( email=form.cleaned_data["email"].lower(), funnel_entry_point=form.cleaned_data["funnel_entry_point"] ) messages.success(request, "subscription-success") if request.session.get('redirect_to', False): return HttpResponseRedirect(request.session.get('redirect_to', reverse_lazy("index"))) return HttpResponseRedirect(reverse_lazy('index')) return HttpResponseRedirect(reverse_lazy("index"))
def clean(self): super(PrefixCSVForm, self).clean() site = self.cleaned_data.get('site') vlan_group = self.cleaned_data.get('vlan_group') vlan_vid = self.cleaned_data.get('vlan_vid') # Validate VLAN if vlan_group and vlan_vid: try: self.instance.vlan = VLAN.objects.get(site=site, group__name=vlan_group, vid=vlan_vid) except VLAN.DoesNotExist: if site: raise forms.ValidationError("VLAN {} not found in site {} group {}".format( vlan_vid, site, vlan_group )) else: raise forms.ValidationError("Global VLAN {} not found in group {}".format(vlan_vid, vlan_group)) except MultipleObjectsReturned: raise forms.ValidationError( "Multiple VLANs with VID {} found in group {}".format(vlan_vid, vlan_group) ) elif vlan_vid: try: self.instance.vlan = VLAN.objects.get(site=site, group__isnull=True, vid=vlan_vid) except VLAN.DoesNotExist: if site: raise forms.ValidationError("VLAN {} not found in site {}".format(vlan_vid, site)) else: raise forms.ValidationError("Global VLAN {} not found".format(vlan_vid)) except MultipleObjectsReturned: raise forms.ValidationError("Multiple VLANs with VID {} found".format(vlan_vid))
def resource_from_data(self, fk_resource, data, request=None, related_obj=None, related_name=None): """ Given a dictionary-like structure is provided, a fresh related resource is created using that data. """ # Try to hydrate the data provided. data = dict_strip_unicode_keys(data) fk_bundle = fk_resource.build_bundle( data=data, request=request ) if related_obj: fk_bundle.related_obj = related_obj fk_bundle.related_name = related_name unique_keys = dict((k, v) for k, v in data.iteritems() if k == 'pk' or (hasattr(fk_resource, k) and getattr(fk_resource, k).unique)) # If we have no unique keys, we shouldn't go look for some resource that # happens to match other kwargs. In the case of a create, it might be the # completely wrong resource. # We also need to check to see if updates are allowed on the FK resource. if unique_keys and fk_resource.can_update(): try: return fk_resource.obj_update(fk_bundle, skip_errors=True, **data) except (NotFound, TypeError): try: # Attempt lookup by primary key return fk_resource.obj_update(fk_bundle, skip_errors=True, **unique_keys) except NotFound: pass except MultipleObjectsReturned: pass # If we shouldn't update a resource, or we couldn't find a matching # resource we'll just return a populated bundle instead # of mistakenly updating something that should be read-only. fk_bundle = fk_resource.full_hydrate(fk_bundle) fk_resource.is_valid(fk_bundle) return fk_bundle
def post(self, request): """ ??????json api?? """ text = request.data["updata"] nodes = text.split() UserSet = set() output = [] for i in range(len(nodes)): if i & 1: UserSet.add((nodes[i - 1], nodes[i])) for userdata in UserSet: username = userdata[0] password = userdata[1] email = userdata[0] + "@acmoj.temp" try: user = User.objects.get(username=username) user.set_password(password) user.save() except User.DoesNotExist: pass try: user = User.objects.get(email = email) except MultipleObjectsReturned: continue except User.DoesNotExist: user = User.objects.create(username=username, real_name=username, email=email) user.set_password(password) user.save() UserProfile.objects.create(user=user, school="unknown",student_id = 9527) user.tps = password output.append(user) rendered = render(request, "oj/account/output.html", {"output": output}) return success_response({"content":rendered.content})
def test_get_one_raises_model_error_if_query_result_is_too_big(self): self.assertRaises( FakeModel.MultipleObjectsReturned, get_one, FakeQueryResult(FakeModel, list(range(2))))
def test_get_one_raises_generic_error_if_other_sequence_is_too_big(self): self.assertRaises(MultipleObjectsReturned, get_one, list(range(2)))
def free_task(self): """Free the task.""" try: self.__lock(self.worker_id, new_state=TASK_STATES["FREE"], initial_states=(TASK_STATES["FREE"], TASK_STATES["ASSIGNED"], TASK_STATES["CREATED"])) except (MultipleObjectsReturned, ObjectDoesNotExist): raise Exception("Cannot free task %d, state is %s" % (self.id, self.get_state_display()))
def assign_task(self, worker_id=None): """Assign the task to a worker identified by worker_id.""" if worker_id is None: worker_id = self.worker_id try: self.__lock(worker_id, new_state=TASK_STATES["ASSIGNED"], initial_states=(TASK_STATES["FREE"], TASK_STATES["CREATED"])) except (MultipleObjectsReturned, ObjectDoesNotExist): raise Exception("Cannot assign task %d" % (self.id))
def open_task(self, worker_id=None): """Open the task on a worker identified by worker_id.""" if worker_id is None: worker_id = self.worker_id try: self.__lock(worker_id, new_state=TASK_STATES["OPEN"], initial_states=(TASK_STATES["FREE"], TASK_STATES["ASSIGNED"])) except (MultipleObjectsReturned, ObjectDoesNotExist): raise Exception("Cannot open task %d, state is %s" % (self.id, self.get_state_display()))
def close_task(self, task_result=""): """Close the task and save result.""" if task_result: self.result = task_result self.save() try: self.__lock(self.worker_id, new_state=TASK_STATES["CLOSED"], initial_states=(TASK_STATES["OPEN"], )) except (MultipleObjectsReturned, ObjectDoesNotExist): raise Exception("Cannot close task %d, state is %s" % (self.id, self.get_state_display())) self.logs.gzip_logs()
def cancel_task(self, user=None, recursive=True): """Cancel the task.""" if user is not None and not user.is_superuser: if self.owner.username != user.username: raise Exception("You are not task owner or superuser.") try: self.__lock(self.worker_id, new_state=TASK_STATES["CANCELED"], initial_states=(TASK_STATES["FREE"], TASK_STATES["ASSIGNED"], TASK_STATES["OPEN"], TASK_STATES["CREATED"])) except (MultipleObjectsReturned, ObjectDoesNotExist): raise Exception("Cannot cancel task %d, state is %s" % (self.id, self.get_state_display())) if recursive: for task in self.subtasks(): task.cancel_task(recursive=True) self.logs.gzip_logs()
def interrupt_task(self, recursive=True): """Set the task state to interrupted.""" try: self.__lock(self.worker_id, new_state=TASK_STATES["INTERRUPTED"], initial_states=(TASK_STATES["OPEN"], )) except (MultipleObjectsReturned, ObjectDoesNotExist): raise Exception("Cannot interrupt task %d, state is %s" % (self.id, self.get_state_display())) if recursive: for task in self.subtasks(): task.interrupt_task(recursive=True) self.logs.gzip_logs()
def timeout_task(self, recursive=True): """Set the task state to timeout.""" try: self.__lock(self.worker_id, new_state=TASK_STATES["TIMEOUT"], initial_states=(TASK_STATES["OPEN"], )) except (MultipleObjectsReturned, ObjectDoesNotExist): raise Exception("Cannot interrupt task %d, state is %s" % (self.id, self.get_state_display())) if recursive: for task in self.subtasks(): task.interrupt_task(recursive=True) self.logs.gzip_logs()
def fail_task(self, task_result=""): """Fail this task and save result.""" if task_result: self.result = task_result self.save() try: self.__lock(self.worker_id, new_state=TASK_STATES["FAILED"], initial_states=(TASK_STATES["OPEN"], )) except (MultipleObjectsReturned, ObjectDoesNotExist): raise Exception("Cannot fail task %i, state is %s" % (self.id, self.get_state_display())) self.logs.gzip_logs()
def get_gift_num(self, gift): try: tmp = IncludedGift.objects.get(gift_set=self, gift=gift) return tmp.count except (IncludedGift.DoesNotExist, MultipleObjectsReturned): return 0
def get_exclusive_user(self, event): if self.inventory.multiple_assignments: return None try: return UsedItem.objects.get(item=self, helper__event=event, timestamp_returned=None).helper except UsedItem.DoesNotExist: raise NotAssigned except MultipleObjectsReturned: raise InvalidMultipleAssignment()
def from_schedule(cls, schedule): spec = {'event': schedule.event, 'latitude': schedule.lat, 'longitude': schedule.lon} try: return cls.objects.get(**spec) except cls.DoesNotExist: return cls(**spec) except MultipleObjectsReturned: cls.objects.filter(**spec).delete() return cls(**spec)
def from_schedule(cls, schedule, period=SECONDS): every = max(schedule.run_every.total_seconds(), 0) try: return cls.objects.get(every=every, period=period) except cls.DoesNotExist: return cls(every=every, period=period) except MultipleObjectsReturned: cls.objects.filter(every=every, period=period).delete() return cls(every=every, period=period)
def from_schedule(cls, schedule): spec = {'minute': schedule._orig_minute, 'hour': schedule._orig_hour, 'day_of_week': schedule._orig_day_of_week, 'day_of_month': schedule._orig_day_of_month, 'month_of_year': schedule._orig_month_of_year} try: return cls.objects.get(**spec) except cls.DoesNotExist: return cls(**spec) except MultipleObjectsReturned: cls.objects.filter(**spec).delete() return cls(**spec)
def put_detail(self, request, **kwargs): """ Either updates an existing resource or creates a new one with the provided data. Calls ``obj_update`` with the provided data first, but falls back to ``obj_create`` if the object does not already exist. If a new resource is created, return ``HttpCreated`` (201 Created). If ``Meta.always_return_data = True``, there will be a populated body of serialized data. If an existing resource is modified and ``Meta.always_return_data = False`` (default), return ``HttpNoContent`` (204 No Content). If an existing resource is modified and ``Meta.always_return_data = True``, return ``HttpAccepted`` (200 OK). """ if django.VERSION >= (1, 4): body = request.body else: body = request.raw_post_data deserialized = self.deserialize(request, body, format=request.META.get('CONTENT_TYPE', 'application/json')) deserialized = self.alter_deserialized_detail_data(request, deserialized) bundle = self.build_bundle(data=dict_strip_unicode_keys(deserialized), request=request) try: updated_bundle = self.obj_update(bundle=bundle, **self.remove_api_resource_names(kwargs)) if not self._meta.always_return_data: return http.HttpNoContent() else: updated_bundle = self.full_dehydrate(updated_bundle) updated_bundle = self.alter_detail_data_to_serialize(request, updated_bundle) return self.create_response(request, updated_bundle) except (NotFound, MultipleObjectsReturned): updated_bundle = self.obj_create(bundle=bundle, **self.remove_api_resource_names(kwargs)) location = self.get_resource_uri(updated_bundle) if not self._meta.always_return_data: return http.HttpCreated(location=location) else: updated_bundle = self.full_dehydrate(updated_bundle) updated_bundle = self.alter_detail_data_to_serialize(request, updated_bundle) return self.create_response(request, updated_bundle, response_class=http.HttpCreated, location=location)
def patch_detail(self, request, **kwargs): """ Updates a resource in-place. Calls ``obj_update``. If the resource is updated, return ``HttpAccepted`` (202 Accepted). If the resource did not exist, return ``HttpNotFound`` (404 Not Found). """ request = convert_post_to_patch(request) basic_bundle = self.build_bundle(request=request) # We want to be able to validate the update, but we can't just pass # the partial data into the validator since all data needs to be # present. Instead, we basically simulate a PUT by pulling out the # original data and updating it in-place. # So first pull out the original object. This is essentially # ``get_detail``. try: obj = self.cached_obj_get(bundle=basic_bundle, **self.remove_api_resource_names(kwargs)) except ObjectDoesNotExist: return http.HttpNotFound() except MultipleObjectsReturned: return http.HttpMultipleChoices("More than one resource is found at this URI.") bundle = self.build_bundle(obj=obj, request=request) bundle = self.full_dehydrate(bundle) bundle = self.alter_detail_data_to_serialize(request, bundle) # Now update the bundle in-place. if django.VERSION >= (1, 4): body = request.body else: body = request.raw_post_data deserialized = self.deserialize(request, body, format=request.META.get('CONTENT_TYPE', 'application/json')) self.update_in_place(request, bundle, deserialized) if not self._meta.always_return_data: return http.HttpAccepted() else: bundle = self.full_dehydrate(bundle) bundle = self.alter_detail_data_to_serialize(request, bundle) return self.create_response(request, bundle, response_class=http.HttpAccepted)