我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用jsonschema.Draft4Validator()。
def validate_drydock_document(self, doc): """Validate a parsed document via jsonschema. If a schema for a document Kind is not available, the document is considered valid. Schema is chosen by the doc['kind'] field. Returns a empty list for valid documents, otherwise returns a list of all found errors :param doc: dictionary of the parsed document. """ doc_kind = doc.get('kind') if doc_kind in self.v1_doc_schemas: validator = jsonschema.Draft4Validator( self.v1_doc_schemas.get(doc_kind)) errors_found = [] for error in validator.iter_errors(doc): errors_found.append(error.message) return errors_found else: return []
def validate_drydock_document(self, doc): """Validate a parsed document via jsonschema. If a schema for a document Kind is not available, the document is considered valid. Schema is chosen by the doc['kind'] field. Returns a empty list for valid documents, otherwise returns a list of all found errors :param doc: dictionary of the parsed document. """ schemaname = doc.get('schema', '') (schema_ns, doc_kind, doc_version) = schemaname.split('/') errors_found = [] if doc_version == 'v1': if schemaname in self.v1_doc_schemas: validator = jsonschema.Draft4Validator( self.v1_doc_schemas.get(schemaname)) for error in validator.iter_errors(doc.get('data', [])): errors_found.append(error.message) return errors_found
def load_config(path: str) -> Dict: """ Loads a config file from the given path :param path: path as str :return: configuration as dictionary """ with open(path) as file: conf = json.load(file) if 'internals' not in conf: conf['internals'] = {} logger.info('Validating configuration ...') try: validate(conf, CONF_SCHEMA) return conf except ValidationError: logger.fatal('Configuration is not valid! See config.json.example') raise ValidationError( best_match(Draft4Validator(CONF_SCHEMA).iter_errors(conf)).message )
def test_api_metadata(self) -> None: """ Tests that the API metadata returns a correct response, and that the data can be validated against the endpoint's schema """ response = self.client.get(self.url, headers=self.headers) self.assertEqual(response.status_code, 200) data = json.loads(response.data.decode('utf-8')) validator = jsonschema.Draft4Validator( data['meta']['schema'] ) if not validator.is_valid(data['data']): self.fail( 'Errors were encountered while validating response: %s' % ( ', '.join( error for error in validator.iter_errors( data['data'] ) ) ) )
def test_validation_error( self, parameters: dict, service: Service ) -> None: """ :param parameters: The parameters that do not satisfy the JSON schema :param service: The service to test """ validator = mock.MagicMock(spec=JSONSchemaValidator) validator_factory = mock.MagicMock(return_value=validator) validator.is_valid = mock.MagicMock(return_value=False) validator.iter_errors = mock.MagicMock( return_value={'error': 'message'} ) self.request.get_json = mock.MagicMock(return_value=parameters) endpoint = JobsForServiceEndpoint( self.session, self.request, self.service_list, validator_factory ) with self.assertRaises(endpoint.Abort): endpoint.post(service) self.assertTrue(endpoint.errors)
def test_valid_post( self, parameters: dict, service: Service ) -> None: """ Test that the job can be correctly created :param parameters: The new job parameters :param service: The service for which the job is to be created """ validator = mock.MagicMock(spec=JSONSchemaValidator) validator_factory = mock.MagicMock(return_value=validator) validator.is_valid = mock.MagicMock(return_value=True) self.request.get_json = mock.MagicMock(return_value=parameters) endpoint = JobsForServiceEndpoint( self.session, self.request, self.service_list, validator_factory ) response = endpoint.post(service) self.assertEqual(response.status_code, 201) self.assertIn('Location', response.headers)
def test_patch_set_results_schema_matches( self, job: Job, new_results: dict ) -> None: """ Tests that the results can be set correctly if they pass JSON schema validation """ request_body = { 'results': new_results } self.request.get_json = mock.MagicMock(return_value=request_body) validator = mock.MagicMock(spec=Draft4Validator) validator.is_valid = mock.MagicMock(return_value=True) endpoint = JobDetail( self.session, flask_request=self.request, validator_factory=mock.MagicMock(return_value=validator) ) response = endpoint.patch(job) self.assertEqual(200, response.status_code) self.assertEqual(job.results, new_results)
def notify(self, **kwargs: dict) -> Response: validator = jsonschema.Draft4Validator(self.schema) self._validate_schema(validator) env_prefix = kwargs.pop('env_prefix', None) environs = self._get_environs(env_prefix) if environs: kwargs = self._merge_dict_into_dict(kwargs, environs) self._validate_data(kwargs, validator) data = self._prepare_data(kwargs) data = self._merge_defaults(data) return self._send_notification(data) # Avoid premature import
def test_oneOf_and_anyOf_are_weak_matches(self): """ A property you *must* match is probably better than one you have to match a part of. """ validator = Draft4Validator( { "minProperties" : 2, "anyOf" : [{"type" : "string"}, {"type" : "number"}], "oneOf" : [{"type" : "string"}, {"type" : "number"}], } ) best = self.best_match(validator.iter_errors({})) self.assertEqual(best.validator, "minProperties")
def test_if_the_most_relevant_error_is_anyOf_it_is_traversed(self): """ If the most relevant error is an anyOf, then we traverse its context and select the otherwise *least* relevant error, since in this case that means the most specific, deep, error inside the instance. I.e. since only one of the schemas must match, we look for the most relevant one. """ validator = Draft4Validator( { "properties" : { "foo" : { "anyOf" : [ {"type" : "string"}, {"properties" : {"bar" : {"type" : "array"}}}, ], }, }, }, ) best = self.best_match(validator.iter_errors({"foo" : {"bar" : 12}})) self.assertEqual(best.validator_value, "array")
def test_if_the_most_relevant_error_is_oneOf_it_is_traversed(self): """ If the most relevant error is an oneOf, then we traverse its context and select the otherwise *least* relevant error, since in this case that means the most specific, deep, error inside the instance. I.e. since only one of the schemas must match, we look for the most relevant one. """ validator = Draft4Validator( { "properties" : { "foo" : { "oneOf" : [ {"type" : "string"}, {"properties" : {"bar" : {"type" : "array"}}}, ], }, }, }, ) best = self.best_match(validator.iter_errors({"foo" : {"bar" : 12}})) self.assertEqual(best.validator_value, "array")
def test_if_the_most_relevant_error_is_allOf_it_is_traversed(self): """ Now, if the error is allOf, we traverse but select the *most* relevant error from the context, because all schemas here must match anyways. """ validator = Draft4Validator( { "properties" : { "foo" : { "allOf" : [ {"type" : "string"}, {"properties" : {"bar" : {"type" : "array"}}}, ], }, }, }, ) best = self.best_match(validator.iter_errors({"foo" : {"bar" : 12}})) self.assertEqual(best.validator_value, "string")
def require_schema(schema): """This decorator verifies that request JSON matches given JSONSchema. http://json-schema.org """ validator = jsonschema.Draft4Validator( schema, format_checker=jsonschema.FormatChecker() ) def outer_decorator(func): @functools.wraps(func) def inner_decorator(self, *args, **kwargs): errors = validator.iter_errors(self.request_json) errors = [err.message for err in errors] if errors: LOG.warning("Cannot validate request: %s", errors) raise exceptions.InvalidJSONError(errors) return func(self, *args, **kwargs) return inner_decorator return outer_decorator
def validate_json(instance, schema): """Validate an instance under the given schema. :param instance: the instance to validate :type instance: dict :param schema: the schema to validate with :type schema: dict :returns: list of errors as strings :rtype: [str] """ def sort_key(ve): return six.u(_hack_error_message_fix(ve.message)) validator = jsonschema.Draft4Validator(schema) validation_errors = list(validator.iter_errors(instance)) validation_errors = sorted(validation_errors, key=sort_key) return [_format_validation_error(e) for e in validation_errors] # TODO(jsancio): clean up this hack # The error string from jsonschema already contains improperly formatted # JSON values, so we have to resort to removing the unicode prefix using # a regular expression.
def is_datafile_valid(datafile): """ Given a datafile determine if it is valid or not. Args: datafile: JSON string representing the project. Returns: Boolean depending upon whether datafile is valid or not. """ try: datafile_json = json.loads(datafile) except: return False try: jsonschema.Draft4Validator(constants.JSON_SCHEMA).validate(datafile_json) except: return False return True
def check_schema(body, schema): """Ensure all necessary keys are present and correct in create body. Check that the user-specified create body is in the expected format and include the required information. :param body: create body :raises InvalidParameterValue: if validation of create body fails. """ validator = jsonschema.Draft4Validator( schema, format_checker=jsonschema.FormatChecker()) try: validator.validate(body) except jsonschema.ValidationError as exc: raise exception.InvalidParameterValue(_('Invalid create body: %s') % exc)
def post(self): store_json = request.json error = best_match(Draft4Validator(schema).iter_errors(store_json)) if error: return jsonify({"error": error.message}), 400 else: store = Store( external_id=str(uuid.uuid4()), neighborhood=store_json.get('neighborhood'), street_address=store_json.get('street_address'), city=store_json.get('city'), state=store_json.get('state'), zip=store_json.get('zip'), phone=store_json.get('phone') ).save() response = { "result": "ok", "store": store_obj(store) } return jsonify(response), 201
def put(self, store_id): store = Store.objects.filter(external_id=store_id, live=True).first() if not store: return jsonify({}), 404 store_json = request.json error = best_match(Draft4Validator(schema).iter_errors(store_json)) if error: return jsonify({"error": error.message}), 400 else: store.neighborhood = store_json.get('neighborhood') store.street_address = store_json.get('street_address') store.city = store_json.get('city') store.state = store_json.get('state') store.zip = store_json.get('zip') store.phone = store_json.get('phone') store.save() response = { "result": "ok", "store": store_obj(store) } return jsonify(response), 200
def validate_schema(payload, schema): """Validates the payload against a defined json schema for the requested endpoint. :param payload: incoming request data :type payload: dict :param schema: the schema the request payload should be validated against :type schema: .json file :returns: errors if any :rtype: list """ errors = [] validator = jsonschema.Draft4Validator(schema, format_checker=jsonschema.FormatChecker()) for error in sorted(validator.iter_errors(payload), key=str): errors.append(error.message) return errors
def load_validator(schema_path, schema): """Create a JSON schema validator for the given schema. Args: schema_path: The filename of the JSON schema. schema: A Python object representation of the same schema. Returns: An instance of Draft4Validator. """ # Get correct prefix based on OS if os.name == 'nt': file_prefix = 'file:///' else: file_prefix = 'file:' resolver = RefResolver(file_prefix + schema_path.replace("\\", "/"), schema) validator = Draft4Validator(schema, resolver=resolver) return validator
def add(self, message): if isinstance(message, singer.RecordMessage): stream = self.ensure_stream(message.stream) if stream.latest_schema: validator_fn = extend_with_default(Draft4Validator) validator = validator_fn( stream.latest_schema, format_checker=FormatChecker()) validator.validate(copy.deepcopy(message.record)) else: print('I saw a record for stream {} before the schema'.format( message.stream)) exit(1) stream.num_records += 1 elif isinstance(message, singer.SchemaMessage): stream = self.ensure_stream(message.stream) stream.num_schemas += 1 stream.latest_schema = message.schema elif isinstance(message, singer.StateMessage): self.latest_state = message.value self.num_states += 1
def validate(schema_path: str, data: dict) -> Tuple[bool, str]: """ Validate the data with the specified schema. Return a tuple where the first value indicates whether the data is valid, and the second value contains an error message for the case where the data is invalid. """ with open(schema_path, 'r') as f: schema = json.loads(f.read()) # TODO: Cache schema and instantiate a Draft4Validator directly try: jsonschema.validate(data, schema, cls=jsonschema.Draft4Validator) except ValidationError as e: return False, e.message return True, None
def test_oneOf_and_anyOf_are_weak_matches(self): """ A property you *must* match is probably better than one you have to match a part of. """ validator = Draft4Validator( { "minProperties": 2, "anyOf": [{"type": "string"}, {"type": "number"}], "oneOf": [{"type": "string"}, {"type": "number"}], } ) best = self.best_match(validator.iter_errors({})) self.assertEqual(best.validator, "minProperties")
def test_if_the_most_relevant_error_is_anyOf_it_is_traversed(self): """ If the most relevant error is an anyOf, then we traverse its context and select the otherwise *least* relevant error, since in this case that means the most specific, deep, error inside the instance. I.e. since only one of the schemas must match, we look for the most relevant one. """ validator = Draft4Validator( { "properties": { "foo": { "anyOf": [ {"type": "string"}, {"properties": {"bar": {"type": "array"}}}, ], }, }, }, ) best = self.best_match(validator.iter_errors({"foo": {"bar": 12}})) self.assertEqual(best.validator_value, "array")
def test_if_the_most_relevant_error_is_oneOf_it_is_traversed(self): """ If the most relevant error is an oneOf, then we traverse its context and select the otherwise *least* relevant error, since in this case that means the most specific, deep, error inside the instance. I.e. since only one of the schemas must match, we look for the most relevant one. """ validator = Draft4Validator( { "properties": { "foo": { "oneOf": [ {"type": "string"}, {"properties": {"bar": {"type": "array"}}}, ], }, }, }, ) best = self.best_match(validator.iter_errors({"foo": {"bar": 12}})) self.assertEqual(best.validator_value, "array")
def test_if_the_most_relevant_error_is_allOf_it_is_traversed(self): """ Now, if the error is allOf, we traverse but select the *most* relevant error from the context, because all schemas here must match anyways. """ validator = Draft4Validator( { "properties": { "foo": { "allOf": [ {"type": "string"}, {"properties": {"bar": {"type": "array"}}}, ], }, }, }, ) best = self.best_match(validator.iter_errors({"foo": {"bar": 12}})) self.assertEqual(best.validator_value, "string")
def test_draft3_schema_draft4_validator(self): stdout, stderr = StringIO(), StringIO() with self.assertRaises(SchemaError): cli.run( { "validator": Draft4Validator, "schema": { "anyOf": [ {"minimum": 20}, {"type": "string"}, {"required": True}, ], }, "instances": [1], "error_format": "{error.message}", }, stdout=stdout, stderr=stderr, )
def __init__(self, versions, path, skip_unknown=False, min_date='2016.09.01', future_hours=24): """ ????? ??? ????????? ?????????? ?? ??? ?? json-?????. :param versions: ?????????????? ?????? ?????????, ???????? ['1.0', '1.05']. :param path: ???? ?? ??????????, ??????? ???????? ??? ?????????? ?? ???????, ???????? ?? ???????, ????????, ????? ??? ????????? 1.0 ?????? ?????? ? <path>/1.0/ :param skip_unknown: ???? ????? ?????? ?????????? ?? ?????????????? ?????????? ????????? """ self._validators = {} self._skip_unknown = skip_unknown schema_dir = os.path.expanduser(path) schema_dir = os.path.abspath(schema_dir) self.min_date = datetime.datetime.strptime(min_date, '%Y.%m.%d') if min_date else None self.future_hours = future_hours for version in versions: full_path = os.path.join(schema_dir, version, 'document.schema.json') with open(full_path, encoding='utf-8') as fh: schema = json.loads(fh.read()) resolver = jsonschema.RefResolver('file://' + full_path, None) validator = Draft4Validator(schema=schema, resolver=resolver) validator.check_schema(schema) # ?????????, ??? ???? ????? - ???????? self._validators[version] = validator
def __init__(self, schema): self.validator = Draft4Validator(schema)
def validate_json(self, json, schema): errors = [] v = Draft4Validator(schema) for error in sorted(v.iter_errors(json), key=str): errors.append('=> ' + str(error)) if len(errors) == 0: return True err = AssertionError('\n\n'.join(errors)) err.reasons = ['JSONSchema validation failed for the reasons above'] raise err
def build_validator(schema, path): handlers = {'': _URISchemaHandler(path)} resolver = RefResolver.from_schema(schema, handlers=handlers) return Draft4Validator(schema, resolver=resolver)
def test_valid(list_name,list_data): validator = Draft4Validator(list_schema) errors = [] for error in validator.iter_errors(list_data): errors.append("{} at {}".format(error.message, "/".join(error.path))) assert errors == []
def validate(config): #schema_path = os.path.join(os.path.dirname(__file__), 'config_schema.json') #with open(schema_path, 'r') as ff: # schemajson = ff.read() schema = json.loads(sd2_config_schema) validator = jsonschema.Draft4Validator(schema) jsonschema.validate(config, schema) if not validator.is_valid(config): sys.stderr.write("Error: configuration file is not valid\n") for error in validator.iter_errors(config): sys.stderr.write(error.message + '\n') sys.exit(1)