我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用rest_framework.exceptions.ValidationError()。
def post(self, request, *args, **kwargs): """Login a user given a username password combination Args: request (rest_framework.request.Request) """ email = request.data.get('email', None) password = request.data.get('password', None) if not all([email, password]): raise serializers.ValidationError({'error': 'email and/or password not provided'}) user = authenticate(email=email, password=password) if user is not None: login(request, user) return Response(PFBUserSerializer(user).data) else: return Response({ 'detail': 'Unable to login with provided username/password' }, status=status.HTTP_401_UNAUTHORIZED)
def set_password(self, request, pk=None): """Detail ``POST`` endpoint for changing a user's password Args: request (rest_framework.request.Request) pk (str): primary key for user to retrieve user from database Returns: Response """ old_password = request.data.get('oldPassword') user = authenticate(email=PFBUser.objects.get(uuid=pk).email, password=old_password) if not user: raise ValidationError({'detail': 'Unable to complete password change'}) new_password = request.data.get('newPassword') if not new_password: raise ValidationError({'detail': 'Unable to complete password change'}) user.set_password(new_password) user.save() return Response({'detail': 'Successfully changed password'}, status=status.HTTP_200_OK)
def test_financial_aid_model_unique(self): """ Tests that FinancialAid objects are unique per User and Program """ financial_aid = FinancialAidFactory.create() # Test creation of FinancialAid that isn't unique_together with "user" and "tier_program__program" # financial aid with same user and different program (new program created by the factory) FinancialAidFactory.create(user=financial_aid.user) # financial aid with same program and different user (new user created by the factory) FinancialAidFactory.create(tier_program=financial_aid.tier_program) # Test updating the original FinancialAid doesn't raise ValidationError financial_aid.income_usd = 100 financial_aid.save() # Test creation should fail for FinancialAid already existing with the same "user" and "tier_program__program" with self.assertRaises(ValidationError): FinancialAidFactory.create( user=financial_aid.user, tier_program=financial_aid.tier_program )
def test_financial_aid_model_duplicate_if_reset(self): """ Tests that FinancialAid objects can not be unique per User and Program if the other are in reset status """ financial_aid = FinancialAidFactory.create() # change the first one to any state that is not `reset` will fail to create a new financial aid for status in FinancialAidStatus.ALL_STATUSES: if status == FinancialAidStatus.RESET: continue financial_aid.status = status financial_aid.save() with self.assertRaises(ValidationError): FinancialAidFactory.create( user=financial_aid.user, tier_program=financial_aid.tier_program ) # reset status will allow a new financial aid financial_aid.status = FinancialAidStatus.RESET financial_aid.save() FinancialAidFactory.create( user=financial_aid.user, tier_program=financial_aid.tier_program )
def test_update_education_different_profile(self): """ Make sure we can't edit an education for a different profile """ with mute_signals(post_save): education1 = EducationFactory.create() education2 = EducationFactory.create() education_object = EducationSerializer(education1).data education_object['id'] = education2.id serializer = ProfileSerializer(instance=education1.profile, data={ 'education': [education_object], 'work_history': [] }) serializer.is_valid(raise_exception=True) with self.assertRaises(ValidationError) as ex: serializer.save() assert ex.exception.detail == ["Education {} does not exist".format(education2.id)]
def update_education(education_list, profile_id): """ Update education for given profile id. Args: education_list (list): List of education dicts. profile_id (int): User profile id. """ saved_education_ids = set() for education in education_list: education_id = education.get("id") if education_id is not None: try: education_instance = Education.objects.get(profile_id=profile_id, id=education_id) except Education.DoesNotExist: raise ValidationError("Education {} does not exist".format(education_id)) else: education_instance = None education_serializer = EducationSerializer(instance=education_instance, data=education) education_serializer.is_valid(raise_exception=True) education_serializer.save(profile_id=profile_id) saved_education_ids.add(education_serializer.instance.id) Education.objects.filter(profile_id=profile_id).exclude(id__in=saved_education_ids).delete()
def validate(self, attrs): """ Assert that filled_out can't be turned off and that agreed_to_terms_of_service is true """ if 'filled_out' in attrs and not attrs['filled_out']: raise ValidationError("filled_out cannot be set to false") if 'agreed_to_terms_of_service' in attrs and not attrs['agreed_to_terms_of_service']: raise ValidationError("agreed_to_terms_of_service cannot be set to false") # Postal code is only required in United States and Canada country = attrs.get("country", "") postal_code = attrs.get("postal_code", "") if country in ("US", "CA") and not postal_code: raise ValidationError("postal_code may not be blank") return super(ProfileFilledOutSerializer, self).validate(attrs)
def test_no_current_financial_aid(self): """ Purchasable course runs must have financial aid available """ course_run, user = create_purchasable_course_run() program = course_run.course.program tier_program = program.tier_programs.first() tier_program.current = False tier_program.save() with self.assertRaises(ValidationError) as ex: get_purchasable_course_run(course_run.edx_course_key, user) assert ex.exception.args[0] == ( "Course run {} does not have a current attached financial aid application".format( course_run.edx_course_key ) )
def test_financial_aid_for_user(self): """ Purchasable course runs must have a financial aid attached for the given user """ course_run, user = create_purchasable_course_run() program = course_run.course.program tier_program = program.tier_programs.first() financial_aid = tier_program.financialaid_set.first() financial_aid.user = UserFactory.create() financial_aid.save() with self.assertRaises(ValidationError) as ex: get_purchasable_course_run(course_run.edx_course_key, user) assert ex.exception.args[0] == ( "Course run {} does not have a current attached financial aid application".format( course_run.edx_course_key ) )
def test_financial_aid_terminal_status(self): """ FinancialAid must have a status which allows purchase to happen """ course_run, user = create_purchasable_course_run() program = course_run.course.program tier_program = program.tier_programs.first() financial_aid = tier_program.financialaid_set.first() for status in set(FinancialAidStatus.ALL_STATUSES).difference(set(FinancialAidStatus.TERMINAL_STATUSES)): financial_aid.status = status financial_aid.save() with self.assertRaises(ValidationError) as ex: get_purchasable_course_run(course_run.edx_course_key, user) assert ex.exception.args[0] == ( "Course run {} does not have a current attached financial aid application".format( course_run.edx_course_key ) )
def test_already_purchased(self, has_to_pay): """ Purchasable course runs must not be already purchased """ course_run, user = create_purchasable_course_run() order = create_unfulfilled_order(course_run.edx_course_key, user) # succeeds because order is unfulfilled assert course_run == get_purchasable_course_run(course_run.edx_course_key, user) order.status = Order.FULFILLED order.save() with self.assertRaises(ValidationError) as ex: get_purchasable_course_run(course_run.edx_course_key, user) assert ex.exception.args[0] == 'Course run {} is already purchased'.format(course_run.edx_course_key) assert has_to_pay.call_count == 1
def validate(self, attrs): # custom validation host = self._get_host_by_fqdn(attrs) if host is None: raise ValidationError('Host \'{}\' not found.'.format(attrs['certname'])) if not host.pxe_installable: raise ValidationError('Host \'{}\' is marked as not installable via PXE.'.format(attrs['certname'])) if not host.pxe_key: raise ValidationError('Host \'{}\' has no pxe_key.'.format(attrs['certname'])) if host.pxe_key != attrs['pxe_key']: raise ValidationError('Supplied pxe_key \'{}\' does not match host \'{}\'.'.format( attrs['pxe_key'], attrs['certname'])) self.host = host return attrs
def validate(self, attrs): attrs = super(BatchRequestSerializer, self).validate(attrs) files_in_use = [] for batch in attrs['batch']: if 'attached_files' not in batch: continue attached_files = batch['attached_files'] if isinstance(attached_files, dict): files_in_use.extend(attached_files.values()) elif isinstance(attached_files, list): files_in_use.extend(attached_files) else: raise ValidationError({'attached_files': 'Invalid format.'}) missing_files = set(files_in_use) - set(self.get_files().keys()) if missing_files: raise ValidationError('Some of files are not provided: {}'.format(', '.join(missing_files))) return attrs
def _process_attr(self, attr): params = re.findall( r'({result=(?P<name>\w+):\$\.(?P<value>[a-zA-Z0-9.*]+)})', attr ) if not params: return attr for url_param in params: if url_param[1] not in self.named_responses: raise ValidationError('Named request {} is missing'.format(url_param[1])) result = get_attribute( self.named_responses[url_param[1]]['_data'], url_param[2].split('.') ) if isinstance(result, list): result = ','.join(map(six.text_type, result)) if attr == url_param[0]: attr = result else: attr = attr.replace(url_param[0], str(result)) return attr
def __iter__(self): for request_data in self.request_serializer.data['batch']: request_data['data'] = self.updated_obj(request_data['data']) request_data['relative_url'] = self._process_attr(request_data['relative_url']) if self.request.content_type.startswith('multipart/form-data'): request_data['_body'] = self._prepare_formdata_body(request_data['data'], files=request_data.get('files', {})) elif self.request.content_type.startswith('application/x-www-form-urlencoded'): request_data['_body'] = self._prepare_urlencoded_body(request_data['data']) elif self.request.content_type.startswith('application/json'): request_data['_body'] = self._prepare_json_body(request_data['data']) else: raise ValidationError('Unsupported content type') yield BatchRequest(self.request, request_data)
def run_validators(self, value): """ Test the given value against all the validators on the field, and either raise a `ValidationError` or simply return. """ errors = [] for validator in self.validators: if hasattr(validator, 'set_context'): validator.set_context(self) try: validator(value) except ValidationError as exc: # If the validation error contains a mapping of fields to # errors then simply raise it immediately rather than # attempting to accumulate a list of errors. if isinstance(exc.detail, dict): raise errors.extend(exc.detail) except DjangoValidationError as exc: errors.extend(get_error_detail(exc)) if errors: raise ValidationError(errors)
def as_serializer_error(exc): assert isinstance(exc, (ValidationError, DjangoValidationError)) if isinstance(exc, DjangoValidationError): detail = get_error_detail(exc) else: detail = exc.detail if isinstance(detail, Mapping): # If errors may be a dict we use the standard {key: list of values}. # Here we ensure that all the values are *lists* of errors. return { key: value if isinstance(value, (list, Mapping)) else [value] for key, value in detail.items() } elif isinstance(detail, list): # Errors raised as a list are non-field errors. return { api_settings.NON_FIELD_ERRORS_KEY: detail } # Errors raised as a string are non-field errors. return { api_settings.NON_FIELD_ERRORS_KEY: [detail] }
def run_validation(self, data=empty): """ We override the default `run_validation`, because the validation performed by validators and the `.validate()` method should be coerced into an error dictionary with a 'non_fields_error' key. """ (is_empty_value, data) = self.validate_empty_values(data) if is_empty_value: return data value = self.to_internal_value(data) try: self.run_validators(value) value = self.validate(value) assert value is not None, '.validate() should return the validated data' except (ValidationError, DjangoValidationError) as exc: raise ValidationError(detail=as_serializer_error(exc)) return value
def is_valid(self, raise_exception=False): # This implementation is the same as the default, # except that we use lists, rather than dicts, as the empty case. assert hasattr(self, 'initial_data'), ( 'Cannot call `.is_valid()` as no `data=` keyword argument was ' 'passed when instantiating the serializer instance.' ) if not hasattr(self, '_validated_data'): try: self._validated_data = self.run_validation(self.initial_data) except ValidationError as exc: self._validated_data = [] self._errors = exc.detail else: self._errors = [] if self._errors and raise_exception: raise ValidationError(self.errors) return not bool(self._errors)
def validate(self, validated_data): ingredient_uuid = validated_data['ingredient']['uuid'] ingredient = Ingredient.objects.get(uuid=ingredient_uuid) validated_data['ingredient'] = ingredient if 'measurement' in validated_data: measurement_details = validated_data.pop('measurement') # measurement_details = validated_data['measurement'] measurement_uuid = measurement_details['uuid'] try: measurement = Measurement.objects.get(uuid=measurement_uuid) except Vendor.DoesNotExist: raise ValidationError('Non-required Measurement UUID doesn\'t exist'.format(measurement_uuid)) validated_data['measurement'] = measurement return validated_data
def validate(self, validated_data): if 'ingredient_compositions' in validated_data: ingredient_compositions = validated_data.pop('ingredient_compositions') ingredient_compositions_uuids = [item['uuid'] for item in ingredient_compositions] ingredient_compositions = IngredientComposition.objects.filter(uuid__in=ingredient_compositions_uuids) if ingredient_compositions.count() != len(ingredient_compositions_uuids): raise ValidationError('Not all ingredient composition UUIDs were found {}'.format( ingredient_compositions_uuids)) else: ingredient_compositions = [] validated_data['ingredient_compositions'] = ingredient_compositions return validated_data
def create(self, validated_data): """ This method is overwritten in order to create User object and associate it with reseller. This operation is needed to create token for reseller """ application_id = self.initial_data['application'].id reseller_name = validated_data['name'] username = '{application_id}.{reseller_name}'.format(application_id=application_id, reseller_name=reseller_name) if get_user_model().objects.filter(username=username).exists(): raise ValidationError('Reseller with such name is already created') user = get_user_model().objects.create(username=username) return Reseller.objects.create(owner=user, application=self.initial_data['application'], **validated_data)
def update(self, request, *args, **kwargs): """ Override update to make sure that only valid clubRole is assigned after updation. """ club_membership = self.get_object() serializer = self.get_serializer(club_membership, data=request.data, partial=True) serializer.is_valid(raise_exception=True) if club_membership.user != serializer.validated_data['user']: raise rest_exceptions.ValidationError( 'You can not update the User!' ) if not club_membership.club_role.club \ .has_role(serializer.validated_data['club_role']): raise rest_exceptions.ValidationError( 'Invalid Club Role ID for this Club!') return super(ClubMembershipViewSet, self).update( request, *args, **kwargs)
def add_club(self, request, pk=None): """ Collaborate with another Club in this project. """ project = self.get_object() club_id = int(request.query_params.get('club', -1)) if club_id == -1: raise rest_exceptions.ValidationError( 'The request must contain a club!' ) try: club = models.Club.objects.get(id=club_id) except models.Club.DoesNotExist: raise rest_exceptions.ValidationError( 'The club does not exist!' ) project.add_club(club) serializer = serializers.ProjectSerializer(project) return Response(serializer.data)
def validate(self, attrs): su = super().validate(attrs) try: longitude = su.get("longitude", self.instance.longitude) except AttributeError: longitude = None try: latitude = su.get("latitude", self.instance.latitude) except AttributeError: latitude = None if latitude is not None and (latitude > 90 or latitude < -90): raise ValidationError("Latitude must be between -90 and 90") if longitude is not None and (longitude > 180 or longitude < -180): raise ValidationError("Longitude must be between -180 and 180") if (longitude is None and latitude is not None) or (longitude is not None and latitude is None): raise ValidationError( "If longitude is provided then latitude is required and vice versa. Both can be null.") return su
def run_validators(self, value): """ Test the given value against all the validators on the field, and either raise a `ValidationError` or simply return. """ errors = [] for validator in self.validators: if hasattr(validator, 'set_context'): validator.set_context(self) try: validator(value) except ValidationError as exc: # If the validation error contains a mapping of fields to # errors then simply raise it immediately rather than # attempting to accumulate a list of errors. if isinstance(exc.detail, dict): raise errors.extend(exc.detail) except DjangoValidationError as exc: errors.extend(exc.messages) if errors: raise ValidationError(errors)
def leave_close_claim(self, request, *args, **kwargs): """ :param request: { "target_account_id": account_id } """ deposit = self.get_object() if deposit.status in deposit.INOPERABLE_STATUSES: raise ValidationError('???????? ? ????????? ??????????') if deposit.status == deposit.STATUS_REQUESTED_CLOSING: raise ValidationError('?????? ?? ???????? ???????? ??? ??????.') target_account_id = request.data['target_account_id'] if not fin_models.Account.objects.filter(pk=target_account_id): raise ValidationError('?????????? ????? ?? ??????????.') account = fin_models.Account.objects.get(pk=target_account_id) if account.status in fin_models.Account.INOPERABLE_STATUSES: raise ValidationError('???????? ? ????????? ?????? ??????????.') res, info = deposit.leave_close_claim(target_account_id) return Response(info, status=status.HTTP_200_OK if res else status.HTTP_400_BAD_REQUEST)
def confirm(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) data = serializer.validated_data try: key, pk = data['key'].rsplit(':', 1) user = self.get_queryset().get(pk=pk) assert md5(user.username).hexdigest() == key except (TypeError, AssertionError, ValueError, get_user_model().DoesNotExist) as e: raise ValidationError('???????? ????') if data['new_password'] != data['new_password_confirm']: raise ValidationError('????? ?????? ?? ?????????') user.set_password(data['new_password']) user.save() return Response(status=status.HTTP_204_NO_CONTENT)
def change_password(self, request, *args, **kwargs): serializer = ChangePasswordSerializer(data=request.data) serializer.is_valid(raise_exception=True) data = serializer.validated_data user = self.get_object() if not user.is_active: raise ValidationError('???????????? ?????????????') if not user.check_password(serializer.validated_data['old_password']): raise ValidationError('???????? ??????') if data['new_password'] != data['new_password_confirm']: raise ValidationError('????? ?????? ?? ?????????') user.set_password(data['new_password']) user.save() return Response(status=status.HTTP_204_NO_CONTENT)
def post(self, request, *args, **kwargs): user = authenticate(userid=request.data['username'], password=request.data['password']) if user: is_active = user.is_active if is_active: token, _ = Token.objects.get_or_create(user=user) response = Response({"token": token.key, "user_pk": token.user_id, "created": token.created}, status=status.HTTP_200_OK) return response else: detail = "?? ??? ??????." raise PermissionDenied(detail=detail) else: detail = "???? ?? ? ????. username? password? ?? ??????." raise ValidationError(detail=detail)
def get(self, request, *args, **kwargs): try: key = request.query_params['key'] user = MomoUser.objects.get(hash_username=key) user.is_active = True user.save() except MomoUser.DoesNotExist: raise Http404 except MultipleObjectsReturned: raise ValidationError(detail="?? ?? url? ?? ?????.") except MultiValueDictKeyError: raise ValidationError(detail="?? ?? url? ?? ?????.") return Response({"user_pk": user.pk, "detail": "user? ????????.", "url": reverse('index', request=request)}, status=status.HTTP_200_OK, template_name='member/activate.html')
def create(self, request, *args, **kwargs): data = request.data photo = data.get('photo') description = data.get('description') if photo or description: if photo: file = request.FILES['photo'] data['photo'] = file else: pass serializer = PostSerializer(data=data) serializer.is_valid(raise_exception=True) self.perform_create(serializer) headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers) else: raise ValidationError(detail="photo? description ? ??? ?? ? ??????.")
def verify_google_token(request): token = request.data.get('token') if token is None: raise ValidationError({'detail': 'Auth token required.'}) try: idinfo = client.verify_id_token(token, settings.GOOGLE_AUTH_KEY) if idinfo['iss'] not in ['accounts.google.com', 'https://accounts.google.com']: raise crypt.AppIdentityError('Wrong issuer.') if idinfo.get('hd') != settings.GOOGLE_AUTH_HOSTED_DOMAIN: raise crypt.AppIdentityError('Wrong hosted domain.') except crypt.AppIdentityError as e: raise AuthenticationFailed(e) defaults = { 'email': idinfo['email'], 'first_name': idinfo.get('given_name', ''), 'last_name': idinfo.get('family_name', ''), } user, created = get_user_model().objects.get_or_create( username=idinfo['email'], defaults=defaults) user.backend = 'django.contrib.auth.backends.ModelBackend' login(request, user) return Response({})
def get(self, request): generator = BetterSchemaGenerator( title=self.title, url=self.url, description=self.description, patterns=self.patterns, urlconf=self.urlconf, definitions=self.definitions, version=self.version, check_view_permissions=self.check_view_permissions ) schema = generator.get_schema(request=request) if not schema: raise exceptions.ValidationError( 'The schema generator did not return a schema Document' ) return Response(schema)
def create(self, validated_data): instance = None if validated_data.get("stripe_js_response"): # Create a Customer try: user = self.context['request'].user stripe_js_response = validated_data.pop("stripe_js_response") instance = StripeCustomer.objects.create( user=user, stripe_js_response=stripe_js_response) instance.create_at_stripe() except stripe.StripeError as e: logging.error( "[AA-Stripe] creating customer failed for user {user.id}: {error}".format(user=user, error=e) ) raise ValidationError({"stripe_error": e._message}) return instance
def post(self, request, join_id): try: game = Game.objects.get(join_id=join_id) except Game.DoesNotExist: raise NotFound("Invalid join_id",404) if game.game_state != 'starting': raise ValidationError("You can no longer join this game") data = request.data enforce_required_params(['name'],data) player = Player.objects.create(name=data['name'], game=game) if game.num_players == game.players.count(): game.game_state = 'pick_wc' game.save() game.start_new_round() return Response({ 'player':player.get_dict(show_private=True), 'game':game.get_dict()})
def post(self, request, join_id): data = request.data enforce_required_params(['cards[]'], data) player = request.user.player cards = data.getlist('cards[]') game = request.user.player.game if game.game_state != "pick_wc": raise ValidationError({'error':"Unable to play cards at this moment"},400) if game.current_black_card.pick != len(cards): raise ValidationError({'error':"incorrect number of decks", 'value':cards},400) playcards = map(lambda card:PlayCard(player=player, card_id=card), cards) PlayCard.objects.bulk_create(playcards) if game.players_played_count() == game.num_players - 1: # if everyone except the cardczar has played, state change game.game_state = "pick_win" game.save() return Response({'status':"success"}) # pick winner
def validate(self, attrs): request = self.context['request'] user = request.user band = attrs['band'] if not band.members.filter(user=user).exists(): raise ValidationError('?? ?? ???????? ? ???? ??????.') source_composition_version = attrs['source_composition_version'] if source_composition_version.composition.band_id == band.id: raise ValidationError( '?????? ???????? ?????????? ? ??????, ??????? ??? ?????????? ? ??? ???????????.' ) return attrs
def commit(self, request, data, *args, **kwargs): user_id = request.channel_session.get('user') if user_id is not None: composition_id = kwargs.get('composition_id') data['composition'] = composition_id if (DiffCompositionVersion.objects .filter(composition_id=composition_id) .count() <= 1): raise ValidationError('Nothing to commit.') diff_version = (DiffCompositionVersion.objects .filter(composition_id=composition_id) .last()) composition_version = self.perform_commit(diff_version, user_id) self.route_send( Group(self.COMPOSITION_GROUP_TEMPLATE % composition_id), CompositionVersionSerializer(composition_version).data, status.HTTP_201_CREATED ) else: raise PermissionDenied
def test_composite_serializer_can_handle_errors_during_update(self): class EngineSerializer(CompositeSerializer): class Meta: composite = Vehicle.engine def set_cylinders(self, instance, field, value): assert False, 'Some error' data = { 'cylinders': 2, } engine = Engine(4, 2345, 'apple', 'petrol') serializer = EngineSerializer(engine, data=data, partial=True) self.assertTrue(serializer.is_valid(), serializer.errors) with self.assertRaises(ValidationError): serializer.save()
def test_patch_update_to_list_with_new_list_with_nested_raises_for_a_bad_pk(self): vehicle = Vehicle( name='Test vehicle', type=VehicleType.bus, engine=Engine(4, 1234, None, None), owner=session.query(Owner).get(1), other=VehicleOther(advertising_cost=4321), options=session.query(Option).filter(Option.id.in_([1, 2])).all() ) class VehicleSerializer(ModelSerializer): class Meta: model = Vehicle session = session fields = ('options', ) extra_kwargs = {'options': {'allow_null': False}} data = {'options': [{'id': 1, 'name': 'Test 1'}, {'id': 5, 'name': 'Test 5'}]} serializer = VehicleSerializer(instance=vehicle, data=data, partial=True) self.assertTrue(serializer.is_valid(), serializer.errors) with self.assertRaises(ValidationError): serializer.update(vehicle, serializer.validated_data)
def test_update_generates_validation_error_when_required_many_to_one_instance_not_found(self): vehicle = Vehicle( name='Test vehicle', type=VehicleType.bus, engine=Engine(4, 1234, None, None), owner=session.query(Owner).get(1), other=VehicleOther(advertising_cost=4321), options=session.query(Option).filter(Option.id.in_([1, 2])).all() ) class VehicleSerializer(ModelSerializer): class Meta: model = Vehicle session = session fields = ('owner', ) extra_kwargs = {'owner': {'allow_null': False}} data = {'owner': {'id': 1234}} serializer = VehicleSerializer(instance=vehicle, data=data, partial=True) self.assertTrue(serializer.is_valid(), serializer.errors) with self.assertRaises(ValidationError): serializer.update(vehicle, serializer.validated_data)
def get_object(self, validated_data, instance=None): """ Returns model object instance using the primary key values in the `validated_data`. If the instance is not found, depending on serializer's `allow_create` value, it will create a new model instance or raise an error. """ pks = self.get_primary_keys(validated_data) checked_instance = None if pks: checked_instance = self.session.query(self.model).get(pks) else: checked_instance = instance if checked_instance is not None: return checked_instance if self.allow_create: return self.model() if self.allow_null: return checked_instance raise ValidationError('No instance of `{}` found with primary keys `{}`'.format(self.model.__name__, pks))
def create(self, validated_data): locations = validated_data.pop('locations') relationship_type = validated_data.pop('relationship_type') person = Person(**validated_data) person.save() if not locations: raise ValidationError('People must have at least one address.') for location in locations: location = Location(**location) location.save() associated_location = AssociatedLocation( location=location, person=person) associated_location.save() relationship = Relationship( relationship_type=relationship_type, from_person=self.context['user'].person, to_person=person ) relationship.save() return person
def create(self, validated_data): new_event = validated_data.pop('event', None) existing_event = validated_data.pop('event_id', None) if not existing_event: if not new_event: raise ValidationError({ 'event': 'You must have a new event or event_id attached to this associated event.' }) else: existing_event = EventSerializer().create(new_event) associated_event = AssociatedEvent( event=existing_event, receiving_person=validated_data.get('receiving_person_id'), creating_person=self.context['user'].person ) associated_event.save() return associated_event
def validate(self, data): """ Format VK video to rptp video format. Args: data: VK video dict. Returns: Dict used to create Video object. """ try: return { 'title': data['title'], 'preview': data['photo_320'], 'url': f"https://m.vk.com/video{data['owner_id']}_{data['id']}", 'duration': data['duration'], 'views': data['views'] } except KeyError: raise ValidationError({'video': 'Video format is invalid.'})