我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用marshmallow.ValidationError()。
def create(name, description): """Create a new project. Example:
polyaxon project create --name=cats-vs-dogs --description=Image Classification with Deep Learning ``` """ try: project_config = ProjectConfig.from_dict(dict(name=name, description=description)) except ValidationError: Printer.print_error('Project name should contain only alpha numerical, "-", and "_".') sys.exit(1) try: project = PolyaxonClients().project.create_project(project_config) except (PolyaxonHTTPError, PolyaxonShouldExitError) as e: Printer.print_error('Could not create project `{}`.'.format(name)) Printer.print_error('Error message `{}`.'.format(e)) sys.exit(1) Printer.print_success("Project `{}` was created successfully with uuid `{}`.".format( project.name, project.uuid.hex))
```
def _deserialize(self, value, attr, data): if not is_collection(value): value = [value] result = [] errors = {} for idx, each in enumerate(value): try: result.append(self.container.deserialize(each)) except ValidationError as e: if e.data is not None: result.append(e.data) errors.update({idx: e.messages}) if errors: raise ValidationError(errors, data=result) return result
def unwrap_item(self, item): """ If the schema has an existing instance the id field must be set. :raises ValidationError: id field isn't present when required. """ id_in_item = 'id' in item if self.load_existing and not id_in_item: # Updating Resources must include type and id keys # http://jsonapi.org/format/1.1/#crud-updating raise ma.ValidationError([{'detail': '`data` object must include `id` key.', 'source': {'pointer': '/data'}}]) if not self.load_existing and id_in_item: # Don't support client side identifier generation at this time. raise ForbiddenIdError() return super().unwrap_item(item)
def unwrap_request(self, data, many): if 'data' not in data: raise ma.ValidationError('Object must include `data` key.') data = data['data'] data_is_collection = ma.utils.is_collection(data) if many: if not data_is_collection: raise ma.ValidationError([{'detail': '`data` object must be a collection.', 'source': {'pointer': '/data'}}]) return [self.unwrap_item(each) for each in data] if data_is_collection: raise ma.ValidationError([ {'detail': '`data` object must be an object, not a collection.', 'source': {'pointer': '/data'}}]) if data is None: # When updating relationships we need to specially handle the primary data being null. # http://jsonapi.org/format/1.1/#crud-updating-to-one-relationships raise NullPrimaryData() return self.unwrap_item(data)
def get_schema(self, *args, **kwargs): def dump(data, many=False, **kwargs): if many: return [{'id': i.id, 'name': i.name} for i in data], '' return {'id': data.id, 'name': data.name}, '' def load(data, partial=False): if not partial and data['id'] == 4: raise ma.ValidationError(message={'id': ['invalid value.']}) return data, '' schema = mock.Mock() schema.dump = mock.Mock(side_effect=dump) schema.load = mock.Mock(side_effect=load) return schema
def _deserialize(self, value: str, attr, data: dict) -> Job.JobStatus: value_to_check = value.upper() if value_to_check == "REGISTERED": status = Job.JobStatus.REGISTERED elif value_to_check == "WORKING": status = Job.JobStatus.WORKING elif value_to_check == "COMPLETED": status = Job.JobStatus.COMPLETED elif value_to_check == "ERROR": status = Job.JobStatus.ERROR else: raise ValidationError( message='Unknown value for job status enum', fields=attr ) return status
def validate_field_max_length(self, value, field_name, max_chars): """ Validates maximum length for the field :param value: field value :type value: str :param field_name: name of the field :type field_name: str :param max_chars: maximum number of chars for the field :type max_chars: int :return: :raise ValidationError: if len(value) is greater than max_chars """ if len(value) > max_chars: raise ValidationError( self.get_max_length_error_message( field_name=field_name, max_chars=max_chars ) )
def validate_field_is_one_of(value, field_name, choices): """ Validates value is an element of choices :param value: field value :type value: str :param field_name: name of the field :type field_name: str :param choices: list of choices for the field :type choices: list of str :return: :raise ValidationError: if choices doesn't contain value """ if value not in choices: raise ValidationError( 'El campo "{}" es incorrecto'.format(field_name) )
def validate_all_fields(self, data): validation_errors = [] for key in data.keys(): underscore_key = convert_camel_case_to_underscore(key) validate_method = getattr( self, 'validate_{}'.format(underscore_key), None ) try: if validate_method: validate_method(data[key]) except ValidationError as v: msg = get_error_message( field_name=key, value=data[key], error_msg=v.message ) validation_errors.append(msg) if validation_errors: raise ValidationError(validation_errors)
def validate_outpath(path): try: with tempfile.NamedTemporaryFile(mode='w', dir=path) as tfile: tfile.write('0') tfile.close() except Exception as e: if isinstance(e, OSError): if e.errno == errno.ENOENT: raise mm.ValidationError( "%s is not in a directory that exists" % path) elif e.errno == errno.EACCES: raise mm.ValidationError( "%s does not appear you can write to path" % path) else: raise mm.ValidationError( "Unknown OSError: {}".format(e.message)) else: raise mm.ValidationError( "Unknown Exception: {}".format(e.message))
def _validate(self, value): """ Parameters ---------- value : str filepath to validate you can write to that location Returns ------- None Raises ------ marshmallow.ValidationError If os.path.dirname cannot be applied, or if directory does not exist, or if you cannot write to that directory, or writing a temporary file there produces any crazy exception """ try: path = os.path.dirname(value) except Exception as e: # pragma: no cover raise mm.ValidationError( "%s cannot be os.path.dirname-ed" % value) # pragma: no cover validate_outpath(path)
def _validate(self, value): if not os.path.isdir(value): try: os.makedirs(value) if self.mode is not None: os.chmod(value, self.mode) except OSError as e: if e.errno == os.errno.EEXIST: pass else: raise mm.ValidationError( "{} is not a directory and you cannot create it".format( value) ) if self.mode is not None: try: assert((os.stat(value).st_mode & 0o777) == self.mode) except: raise mm.ValidationError( "{} does not have the mode ({}) that was specified ".format( value, self.mode) ) # use outputfile to test that a file in this location is a valid path validate_outpath(value)
def _validate(self, value): """ Parameters ---------- value : str value to validate """ if (not hasattr(logging, value) or type(getattr(logging, value)) is not int): raise mm.ValidationError( '{} is not a valid loglevel; try one of {}'.format( value, LogLevel.options)) # Would prefer this to be an argparse.Action subclass, but not yet sure how to implement this way logging.getLogger().setLevel(value)
def output(self,d,output_path=None): """method for outputing dictionary to the output_json file path after validating it through the output_schema_type Parameters ---------- d:dict output dictionary to output output_path: str path to save to output file, optional (with default to self.mod['output_json'] location) Raises ------ marshmallow.ValidationError If any of the output dictionary doesn't meet the output schema """ if output_path is None: output_path = self.args['output_json'] output_json = self.get_output_json(d) with open(output_path,'w') as fp: json.dump(output_json,fp)
def test_bad_output(tmpdir): file_out = tmpdir.join('test_output_bad.json') input_parameters = { 'output_json':str(file_out) } mod = ArgSchemaParser(input_data = input_parameters, output_schema_type = MyOutputSchema, args=[]) M=[[5,5],[7,2]] Mnp = np.array(M) output = { "a":"example", "b":"not a number", "M":Mnp } with pytest.raises(mm.ValidationError): mod.output(output)
def replace(cls, obj, field, value, state): """ This is method for replace operation. It is separated to provide a possibility to easily override it in your Parameters. Args: obj (object): an instance to change. field (str): field name value (str): new value state (dict): inter-operations state storage Returns: processing_status (bool): True """ if not hasattr(obj, field): raise ValidationError("Field '%s' does not exist, so it cannot be patched" % field) setattr(obj, field, value) return True
def set_pronouns(self, contact, original_data): subject_pronoun = original_data.get("pronoun") if subject_pronoun: if not isinstance(subject_pronoun, str): raise ValidationError("Pronoun must be a string") contact.pronouns = get_pronouns_by_subject(subject_pronoun) filters = original_data.get("pronouns") if filters: if isinstance(filters, list) and all(isinstance(f, str) for f in filters): contact.pronouns_list = [get_pronouns_by_subject(f) for f in filters] elif isinstance(filters, dict): contact.pronouns = get_pronouns_by_filters(filters) else: raise ValidationError( "Pronouns must be a list of subject pronoun strings, " "or an object of pronoun types." ) return contact
def get_pronouns_by_subject(subject_pronoun): """ Given a subject pronoun (as a string), return the Pronouns object that corresponds to that subject pronoun. This can be called with strings like "he", "she", and "it". If no Pronouns object is found that matches this subject pronoun, or there are multiple Pronouns objects that match, a ValidationError is raised. """ try: return Pronouns.query.filter_by(subject=subject_pronoun).one() except NoResultFound: raise ValidationError( 'No set of pronouns found for subject pronoun "{subject}"'.format( subject=subject_pronoun ) ) except MultipleResultsFound: raise ValidationError( 'Multiple sets of pronouns found for subject pronoun ' '"{subject}". Use more specific filters.'.format( subject=subject_pronoun ) )
def get_pronouns_by_filters(filters): """ Given a dictionary of pronoun filters, return the Pronouns object that corresponds to those pronouns pronoun. Some examples: {"subject": "he"} {"subject": "they", "reflexive": "themself"} {"subject": "they", "reflexive": "themselves"} {"possessive": "hers"} If no Pronouns object is found that matches these filters, or there are multiple Pronouns objects that match, a ValidationError is raised. """ try: return Pronouns.query.filter_by(**filters).one() except NoResultFound: raise ValidationError( "No set of pronouns found for filters." ) except MultipleResultsFound: raise ValidationError( "Multiple sets of pronouns found. Use more specific filters." )
def test_error_handling( self, exc, expected_error, expected_status_code, expected_message ): entrypoint = HttpEntrypoint('GET', 'url') entrypoint.expected_exceptions = ( ValidationError, ProductNotFound, OrderNotFound, TypeError, ) response = entrypoint.response_from_exception(exc) response_data = json.loads(response.data.decode()) assert response.mimetype == 'application/json' assert response.status_code == expected_status_code assert response_data['error'] == expected_error assert response_data['message'] == expected_message
def add_product(self, request): """ Add product to cache in every region This endpoint can be called in any region and will dispatch event which will be handled by indexer's `handle_product_added` in all regions """ try: payload = Product(strict=True).loads( request.get_data(as_text=True) ).data except ValidationError as err: return 400, json.dumps({ 'error': 'BAD_REQUEST', 'message': err.messages }) self.dispatch('product_added', Product(strict=True).dump(payload).data) return 200, ''
def order_product(self, request): """ HTTP entrypoint for ordering products This entrypoint can be called in any region but message will be published on a federated `fed.order_product` queue that is only consumed in `europe` region where master database and service with write permissions to it lives. """ try: payload = Order(strict=True).loads( request.get_data(as_text=True) ).data except ValidationError as err: return 400, json.dumps({ 'error': 'BAD_REQUEST', 'message': err.messages }) self.order_product_publisher( payload, routing_key=ROUTING_KEY_ORDER_PRODUCT ) return 200, ''
def _deserialize(self, value, attr, data): if value is None: return None if isinstance(value, Reference): if type(value) is not self.reference_cls: value = self.reference_cls(value.document_cls, value.pk) return value elif isinstance(value, self._document_implementation_cls): if not value.is_created: raise ValidationError( _("Cannot reference a document that has not been created yet.")) return self.reference_cls(value.__class__, value.pk) elif isinstance(value, dict): if value.keys() != {'cls', 'id'}: raise ValidationError(_("Generic reference must have `id` and `cls` fields.")) try: _id = ObjectId(value['id']) except ValueError: raise ValidationError(_("Invalid `id` field.")) return self._deserialize_from_mongo({ '_cls': value['cls'], '_id': _id }) else: raise ValidationError(_("Invalid value for generic reference field."))
def _deserialize(self, value, attr, data): embedded_document_cls = self.embedded_document_cls if isinstance(value, embedded_document_cls): return value # Handle inheritance deserialization here using `cls` field as hint if embedded_document_cls.opts.offspring and isinstance(value, dict) and 'cls' in value: to_use_cls_name = value.pop('cls') if not any(o for o in embedded_document_cls.opts.offspring if o.__name__ == to_use_cls_name): raise ValidationError(_('Unknown document `{document}`.').format( document=to_use_cls_name)) try: to_use_cls = embedded_document_cls.opts.instance.retrieve_embedded_document( to_use_cls_name) except NotRegisteredDocumentError as e: raise ValidationError(str(e)) return to_use_cls(**value) else: # `Nested._deserialize` calls schema.load without partial=True data, errors = self.schema.load(value, partial=True) if errors: raise ValidationError(errors, data=data) return self._deserialize_from_mongo(data)
def load(self, data, partial=False): # Always use marshmallow partial load to skip required checks loaded_data, err = self.schema.load(data, partial=True) if err: raise ValidationError(err) self._data = loaded_data # Map the modified fields list on the the loaded data self.clear_modified() for key in loaded_data: self._mark_as_modified(key) if partial: self._collect_partial_fields(data) else: self.not_loaded_fields.clear() # Must be done last given it modify `loaded_data` self._add_missing_fields()
def test_dump_only(self): @self.instance.register class Doc(Document): dl = fields.IntField() do = fields.IntField(dump_only=True) lo = fields.IntField(load_only=True) nope = fields.IntField(dump_only=True, load_only=True) with pytest.raises(marshmallow.ValidationError): Doc(do=1) with pytest.raises(marshmallow.ValidationError): Doc(nope=1) assert Doc(dl=1, lo=2).dump() == {'dl': 1} assert Doc(nope=marshmallow.missing, do=marshmallow.missing).dump() == {}
def test_route_naming(self): class MySchema(EmbeddedSchema): in_front = fields.IntField(attribute='in_mongo') MyDataProxy = data_proxy_factory('My', MySchema()) d = MyDataProxy() with pytest.raises(ValidationError): d.load({'in_mongo': 42}) d.load({'in_front': 42}) with pytest.raises(KeyError): d.get('in_mongo') assert d.get('in_front') == 42 d.set('in_front', 24) assert d._data == {'in_mongo': 24} assert d.get('in_front') == 24 assert d.dump() == {'in_front': 24} assert d.to_mongo() == {'in_mongo': 24}
def test_required(self): @self.instance.register class Person(Document): name = fields.StrField(required=True) birthday = fields.DateTimeField() person = Person(birthday=datetime(1968, 6, 9)) # required should be called during commit with pytest.raises(ValidationError) as exc: person.required_validate() assert exc.value.messages == {'name': ['Missing data for required field.']} person.name = 'Marty' person.required_validate()
def test_strict_embedded_document(self): @self.instance.register class StrictEmbeddedDoc(EmbeddedDocument): a = fields.IntField() @self.instance.register class NonStrictEmbeddedDoc(EmbeddedDocument): a = fields.IntField() class Meta: strict = False data_with_bonus = {'a': 42, 'b': 'foo'} with pytest.raises(exceptions.UnknownFieldInDBError): StrictEmbeddedDoc.build_from_mongo(data_with_bonus) non_strict_doc = NonStrictEmbeddedDoc.build_from_mongo(data_with_bonus) assert non_strict_doc.to_mongo() == data_with_bonus non_strict_doc.dump() == {'a': 42} with pytest.raises(exceptions.ValidationError) as exc: NonStrictEmbeddedDoc(a=42, b='foo') assert exc.value.messages == {'_schema': ['Unknown field name b.']}
def test_phone_number_field_load(number, expected, strict, passes): """Ensure that the Phone Number field can format numbers correctly.""" schema = PhoneNumberSchema(context={'strict_phone_validation': strict}) payload = {'phone': number} if passes: deserialized = schema.load(payload).data assert deserialized['phone'] == expected serialized = schema.dump(deserialized).data assert serialized['phone'] == expected else: error_msg = { 'phone': [("The value for phone ({}) is not a valid phone " "number.".format(number))] } with pytest.raises(ValidationError, message=error_msg): schema.load(payload)
def _deserialize(self, value, attr, data): """Deserializes a string into an Arrow object.""" if not self.context.get('convert_dates', True) or not value: return value value = super(ArrowField, self)._deserialize(value, attr, data) timezone = self.get_field_value('timezone') target = arrow.get(value) if timezone and text_type(target.to(timezone)) != text_type(target): raise ValidationError( "The provided datetime is not in the " "{} timezone.".format(timezone) ) return target
def invalid_fields(self, data, original_data): """Validator that checks if any keys provided aren't in the schema. Say your schema has support for keys ``a`` and ``b`` and the data provided has keys ``a``, ``b``, and ``c``. When the data is loaded into the schema, a :class:`marshmallow.ValidationError` will be raised informing the developer that excess keys have been provided. Raises: marshmallow.ValidationError: Raised if extra keys exist in the passed in data. """ errors = [] for field in original_data: # Skip nested fields because they will loop infinitely if isinstance(field, (set, list, tuple, dict)): continue if field not in self.fields.keys(): errors.append(field) if errors: raise ValidationError("Invalid field", field_names=errors)
def _field2method(field_or_factory, method_name, preprocess=None): def method(self, name, default=ma.missing, subcast=None, **kwargs): name = self._prefix + name if self._prefix else name missing = kwargs.pop('missing', None) or default if isinstance(field_or_factory, type) and issubclass(field_or_factory, ma.fields.Field): field = field_or_factory(missing=missing, **kwargs) else: field = field_or_factory(subcast=subcast, missing=missing, **kwargs) self._fields[name] = field parsed_key, raw_value = _get_from_environ(name, ma.missing) if raw_value is ma.missing and field.missing is ma.missing: raise EnvError('Environment variable "{}" not set'.format(parsed_key)) value = raw_value or field.missing if preprocess: value = preprocess(value, subcast=subcast, **kwargs) try: value = field.deserialize(value) except ma.ValidationError as err: raise EnvError('Environment variable "{}" invalid: {}'.format(name, err.args[0])) else: self._values[name] = value return value method.__name__ = str(method_name) # cast to str for Py2 compat return method
def filter(self, collection, data, **kwargs): """Parse data and apply filter.""" try: return self.apply(collection, self.parse(data), **kwargs) except ValidationError: return collection
def _deserialize(self, value, attr, data): try: return bson.ObjectId(value) except: raise ma.ValidationError('invalid ObjectId `%s`' % value)
def test_serialization_with_invalid_data(self, model): field = fields.MACAddressField() model.mac = 'random string' with pytest.raises(marshmallow.ValidationError) as e: field.serialize('mac', model) assert str(e.value) == "\"random string\" cannot be formatted as MAC."
def test_deserialization_with_invalid_data(self): field = fields.MACAddressField() value = 'random string' with pytest.raises(marshmallow.ValidationError) as e: field.deserialize(value) assert str(e.value) == "\"random string\" cannot be formatted as MAC."
def test_deserialization_with_empty_data(self): field = fields.MACAddressField() value = None with pytest.raises(marshmallow.ValidationError) as e: field.deserialize(value) assert str(e.value) == "Field may not be null."
def encode(model: structures.Model): try: serialization_result = model.Meta.schema().dumps(model) except AttributeError as e: raise exceptions.EncodingError(e) except marshmallow.ValidationError as e: raise exceptions.EncodingError(e.messages) else: if serialization_result.errors: raise exceptions.EncodingError(serialization_result.errors) else: return serialization_result.data
def decode(type, encoded_data: str): try: decoded_data = type.Meta.schema().loads(encoded_data) except (json.decoder.JSONDecodeError, TypeError, AttributeError): raise exceptions.DecodingError('Error while decoding.', encoded_data=encoded_data) except marshmallow.ValidationError as e: raise exceptions.DecodingError(e.messages) else: if decoded_data.errors: raise exceptions.DecodingError(decoded_data.errors) else: return type(**decoded_data.data)
def test_uri_dump_malformed(): schema = URISchema(strict=True) assert_that( calling(schema.dump).with_args( dict( foo="example", ), ), raises(ValidationError), )
def test_uri_load_malformed(): schema = URISchema(strict=True) assert_that( calling(schema.load).with_args( dict( foo="example", ), ), raises(ValidationError), )
def test_dump_invalid(): schema = LanguageSchema(strict=True) assert_that( calling(schema.dump).with_args(dict( language="english", )), raises(ValidationError), )
def test_load_invalid(): schema = LanguageSchema(strict=True) assert_that( calling(schema.load).with_args(dict( language="english", )), raises(ValidationError), )
def handle_exception(error): if isinstance(error, ValidationError) or\ isinstance(error, IncorrectTypeError): return make_marshmallow_error_output(error, 400) elif isinstance(error, MongoValidationError): return make_exception_error_output(error, 400) elif isinstance(error, DoesNotExist): return make_not_found_error_output(error) elif isinstance(error, NotUniqueError): return make_exception_error_output(error, 400) else: return make_exception_error_output(error, 500)
def must_not_be_blank(data): if not data: raise ValidationError('Data not provided.')
def _postprocess(self, data): # vault.envs format should be: # ENV_NAME secret/path:key env_map = ObjectDict() for env in data['env']: try: name, path_key = env.strip().split(' ', 1) path, key = path_key.strip().split(':', 1) except ValueError: raise ValidationError('Invalid vault env {}'.format(name), 'env') env_map[name] = (path, key) data['env'] = env_map return super()._postprocess(data)
def parse(cls, conf): schema = SpecificationSchema() try: parsed = schema.load(conf) except ValidationError as e: logger.exception('badwolf specification validation error') raise InvalidSpecification(str(e)) data = parsed.data spec = cls() for key, value in data.items(): setattr(spec, key, value) return spec
def parse_secretfile(self, path): if hasattr(path, 'read') and callable(path.read): envs = parse_secretfile(path) else: with open(path) as fd: envs = parse_secretfile(fd) env_map = {} for env in envs: try: name, path_key = env.strip().split(' ', 1) path, key = path_key.strip().split(':', 1) except ValueError: raise ValidationError('Invalid Secretfile env {}'.format(name)) env_map[name] = (path, key) self.vault.env.update(env_map)
def validate_name(self, value): if not 0 < len(value) < 30: raise ValidationError('name?????')