我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用jsonschema.SchemaError()。
def __init__(self, path=None): self.path = path or os.getenv('WEBHDFS_CONFIG', self.default_path) if osp.exists(self.path): try: self.config = json.loads(open(self.path).read()) self.schema = json.loads(resource_string(__name__, 'resources/config_schema.json')) #self.schema = open("resources/schema.config").read() try: js.validate(self.config, self.schema) except js.ValidationError as e: print e.message except js.SchemaError as e: print e except ParsingError: raise HdfsError('Invalid configuration file %r.', self.path) _logger.info('Instantiated configuration from %r.', self.path) else: raise HdfsError('Invalid configuration file %r.', self.path)
def validate_config(fname): """ Validate configuration file in json format. """ # Load schema schema_fname = pkg_resources.resource_filename('export2hdf5', "config_schema.json") schema = load_json_file(schema_fname) config = load_json_file(fname) res = None try: jsonschema.validate(config, schema) except jsonschema.ValidationError as e: res = e.message except jsonschema.SchemaError as e: res = e return res
def validate(install_json): """Validate install.json file for required parameters""" # install.json validation try: with open(install_json) as fh: data = json.loads(fh.read()) validate(data, schema) print('{} is valid'.format(install_json)) except SchemaError as e: print('{} is invalid "{}"'.format(install_json, e)) except ValidationError as e: print('{} is invalid "{}"'.format(install_json, e)) # @staticmethod # def _wrap(data): # """Wrap any parameters that contain spaces # # Returns: # (string): String containing parameters wrapped in double quotes # """ # if len(re.findall(r'[!\-\s\$]{1,}', data)) > 0: # data = '"{}"'.format(data) # return data
def validate(self, descriptor, schema_id): """ Validate a descriptor against a schema template :param descriptor: :param schema_id: :return: """ try: jsonschema.validate(descriptor, self.load_schema(schema_id)) return True except ValidationError as e: log.error("Failed to validate Descriptor against schema '{}'" .format(schema_id)) self.error_msg = e.message log.error(e.message) return except SchemaError as e: log.error("Invalid Schema '{}'".format(schema_id)) self.error_msg = e.message log.debug(e) return
def test_validate_invalid_data(self, jsonschema_mock): paths = [(("a", 0), "\['a'\]\[0\]"), ((), "")] for path, details in paths: msg = "Invalid data: error." if details: msg += "\nField: {0}".format(details) with self.assertRaisesRegexp(ValueError, msg): jsonschema_mock.validate.side_effect = ValidationError( "error", path=path ) validators._validate_data(self.data, self.schema) msg = "Invalid schema: error." if details: msg += "\nField: {0}".format(details) with self.assertRaisesRegexp(ValueError, msg): jsonschema_mock.validate.side_effect = SchemaError( "error", schema_path=path ) validators._validate_data(self.data, self.schema)
def validate(self): data = self.get_data(self.data_file) schema = open(self.schema_file).read() try: # result may be needed, commented out in case it can be used # result = jsonschema.validate(json.loads(data), json.loads(schema)) jsonschema.validate(json.loads(data), json.loads(schema)) return (True, json.loads(data)) except jsonschema.ValidationError as e: return (False, "ValidationError: {0}".format(e.message)) except jsonschema.SchemaError as e: return (False, "SchemaError: {0}".format(e)) except Exception as e: return (False, "Unknown Error: {0}".format(e))
def main(arg1, arg2): with open(arg1) as f: data = json.load(f) with open(arg2) as f: schema = json.load(f) try: jsonschema.validate(data, schema) return 'JSON successfully validated.' except jsonschema.ValidationError as e: return e.message except jsonschema.SchemaError as e: return e
def is_valid(self): handlers = {'https': self.session_request_json, 'http': self.session_request_json} schema = await self.schema.raw_schema resolver = RefResolver.from_schema(schema, handlers=handlers) try: validate(self.data, schema, resolver=resolver) except (SchemaError, ValidationError): return False return True
def _validate_schema(self, validator: jsonschema.Draft4Validator): """ Validates provider schema for syntax issues. Raises :class:`SchemaError` if relevant :param validator: :class:`jsonschema.Draft4Validator` """ try: log.debug('validating provider schema') validator.check_schema(self.schema) except jsonschema.SchemaError as e: # todo generate custom errors when relevant raise SchemaError(schema_error=e.message, provider=self.provider_name, data=self.schema)
def validate_json_with_schema(data, schema): errors = [] try: validate(data, schema) except ValidationError as e: errors.append('Schema Validation Error! message [{}] does not validate against schema. Error [{}]'.format(data, e)) except SchemaError as e: errors.append('JSON Parse Error! Could not parse [{}]. Error [{}]'.format(data, e)) return errors
def validate_schema(self, schema): try: jsonschema.Draft4Validator.check_schema(schema) except jsonschema.SchemaError: raise exceptions.InvalidSchema('jsonschema')
def check(name, correct_schema): schema = validator.schema[name] cls = validator_for(schema) is_valid_schema = True try: cls.check_schema(schema) except SchemaError: is_valid_schema = False assert is_valid_schema assert schema == correct_schema # ????json
def validate(self, json, schema_name): """ ???? :param json: type(str) ??json :param schema_name: type(str) schema?? :return: (boolean, dict) ???????? (True, );??????? (False, err) err = { "key_name":"", "key_path":"", "msg": "" } :type json: dict :type schema_name: str :rtype (bool, str) """ err = { "key_name": "", "key_path": "", "msg": "" } try: jsonschema.validate(json, self.schema[schema_name], format_checker=jsonschema.FormatChecker()) return True, err except ValidationError, e: key_name = "" if e.path: key_name = e.path.pop().replace(".", "") key_path = str(list(e.path)).replace("[", "").replace("]", "").replace(", ", ".").replace("'", "") msg = e.message err['key_name'] = key_name err['key_path'] = key_path err['msg'] = msg return False, err except SchemaError: raise SchemaError("invalid schema in validator[%s], please checking" % schema_name) except KeyError: raise ValueError("not found schema name [%s] in schema config" % schema_name)
def test_raises_on_bad_schema(self): def fn(): pass self.assertRaises( jsonschema.SchemaError, validate_body({'required': 'bar'}), fn )
def test_raises_on_bad_schema(self): def fn(): pass self.assertRaises( jsonschema.SchemaError, validate_output({'required': 'bar'}), fn )
def validate_schema(schema): """Validate that 'schema' is correct. This validates against the jsonschema v4 draft. :raises jsonschema.SchemaError: if the schema is invalid. """ jsonschema.Draft4Validator.check_schema(schema)
def get_descriptor_type(self, descriptor): """ This function obtains the type of a descriptor. Its methodology is based on trial-error, since it attempts to validate the descriptor against the available schema templates until a success is achieved """ # Gather schema templates ids templates = {self.SCHEMA_PACKAGE_DESCRIPTOR, self.SCHEMA_SERVICE_DESCRIPTOR, self.SCHEMA_FUNCTION_DESCRIPTOR} # Cycle through templates until a success validation is return for schema_id in templates: try: jsonschema.validate(descriptor, self.load_schema(schema_id)) return schema_id except ValidationError: continue except SchemaError as error_detail: log.error("Invalid Schema '{}'".format(schema_id)) log.debug(error_detail) return
def add_schema(self, data_type: str, path: str) -> bool: """Reads a JSON schemes file from the given path and stores it in the internal dictionary. Args: data_type: The name of the data type (e.g., 'observation'). path: The path to the JSON schemes file. Returns: True if schemes has been added, False if not. """ if self._schema.get(data_type): return False schema_path = Path(self._schema_root_path, path) if not schema_path.exists(): self.logger.error('Schema file "{}" not found.' .format(schema_path)) return False with open(str(schema_path), encoding='utf-8') as data_file: try: schema = json.loads(data_file.read()) jsonschema.Draft4Validator.check_schema(schema) self._schema[data_type] = schema self.logger.debug('Loaded schema "{}"' .format(data_type)) except json.JSONDecodeError: self.logger.error('Invalid JSON file "{}"' .format(schema_path)) return False except jsonschema.SchemaError: self.logger.error('Invalid JSON schema "{}"' .format(schema_path)) return False return True
def init(cls): try: with open(Filter.FILTER_SCHEMA_FNAME) as f: schema = json.load(f) jsonschema.validate({}, schema) except jsonschema.ValidationError: cls.schema_validator = jsonschema.Draft4Validator(schema) except jsonschema.SchemaError as e: raise AppException('Failed loading filter validation schema.\n{}'.format(FILTER_SCHEMA_ERROR.format(e))) # except FileNotFoundError as e: except Exception as e: raise AppException('Failed loading filter validation schema.\n{}\n' 'Make sure the file are valid and in place.'.format(e))
def __init__(self, registry, schema_data): try: jsonschema.validate(schema_data, registry.schema) except jsonschema.SchemaError as e: print(e) raise SchemaError() self.registry = registry self.fields = [] for field_data in schema_data['fields']: self.fields.append(Field(self, field_data))
def parse_default(self, default_data): """Returns a function to get the default value for a field. Can be any of: * constant (e.g. None, 3, "hello") * object - function A function allows you to choose a default value based on other submitted values or to default to today's date. The returned function should have two parameters: * form - parsed field values. * path - path to the current field as a list. The returned function should return a value to use as the default. """ if default_data is None: # Default to None return wrap(None) elif isinstance(default_data, dict): # Run a function to get the default value name = default_data.get('name') if name is None: raise SchemaError() default = self.registry.get_default(self.type, name)(self, default_data) return default else: # Default to a constant try: # Need to parse the constant (e.g. dates) return wrap(self.parser(default_data)) except ValueError: raise SchemaError()
def parse_required(self, required_data): """Returns a function to determine if a field is required. Can be any of: * True - field is required * False - field is optional * object - custom function The function option is useful when a field is only required in certain situations (e.g. if another field is a particular value). The returned function should have two parameters: * form - parsed field values. * path - path to the current field as a list. Note: The path is currently just the name of the field. The returned function should return true if the field is required. Missing values are replaced with nulls when the field is parsed. An error is returned if a null value is given for a required field. """ if required_data is True: # Field is required (reject None) return required_f elif required_data is False: # Field is optional (accept None) return optional_f elif isinstance(required_data, dict): # Run a function to determine if this field is required name = required_data['name'] required = self.registry.get_required(name)(self, required_data) return required else: # Should never reach here raise SchemaError()
def parse_visible(self, visible_data): """Returns a function that determines if the field is visible. Fields that are hidden have their value set to None. Can be any of: * True - field is visible * False - field is hidden * object - custom function The function option can be used to show or hide a field based on the value of another field. The returned function should have two parameters: * form - parsed field values. * path - path to the current field as a list. Note: The path is currently just the name of the field. The returned function should return true if the field is visible. """ if visible_data is True: # Field is visible return visible_f elif visible_data is False: # Field is hidden (set value to None) return hidden_f elif isinstance(visible_data, dict): # Run a function to determine if this field is visible name = visible_data['name'] visible = self.registry.get_visible(name)(self, visible_data) return visible else: # Should never reach here raise SchemaError()
def parse_validators(self, validators_data): """Parse a list of validators.""" if not isinstance(validators_data, list): raise SchemaError() validators = [] for validator_data in validators_data: validators.append(self.parse_validator(validator_data)) return validators
def _validate_data(data, schema): """Validate the input data using jsonschema validation. :param data: a data to validate represented as a dict :param schema: a schema to validate represented as a dict; must be in JSON Schema Draft 4 format. """ try: jsonschema.validate(data, schema) except jsonschema.ValidationError as e: _raise_validation_error("data", e.message, e.path) except jsonschema.SchemaError as e: _raise_validation_error("schema", e.message, e.schema_path)
def validate_obj_schema(obj, schema): """ Validate an object with an schema, if false, returns error message """ try: Draft4Validator.check_schema(schema) Draft4Validator(schema).validate(obj) except jsonschema.ValidationError as val_err: return (False, {'msg' : str(val_err)}) except jsonschema.SchemaError as sch_err: return (False, {'msg' : str(sch_err)}) return (True, obj)
def __call__(self, req, session): denied = await self._authorize(req, session) if denied is not None: return denied response_headers = {'content-type': 'application/json'} try: body_params = await self._build_body_params(req) query_params = self._build_non_body_params(self._query_validator, req.query) path_params = self._build_non_body_params(self._path_validator, req.path_params) headers_params = self._build_non_body_params(self._headers_validator, dict(req.headers)) except (ValidationError, SchemaError) as error: return self._valdation_error_to_response(error, response_headers) req = SwaggerRequest( req.path, req.method, scheme=req.scheme, host=req.host, path_params=path_params, query=query_params, headers=headers_params, body=body_params, body_schema=self._body_validator.schema if self._body_validator else None, context=req.context ) try: if session is None: resp = await self._operation(req) else: resp = await self._operation(req, session) except (ValidationError, SchemaError) as error: return self._valdation_error_to_response(error, response_headers) except Exception as error: body = ujson.dumps({'message': 'Something unexpected happened'}) self._logger.exception('Unexpected') return SwaggerResponse(500, body=body, headers=response_headers) else: if resp.headers.get('content-type') is None: resp.headers.update(response_headers) return resp
def _validate_json(self, excel_config_json_path): try: with open(excel_config_json_path) as file_pointer: json_obj = json.load(file_pointer) # the file is not deserializable as a json object except ValueError as exc: self._plugin.log.error('Malformed JSON file: {0} \n {1}'.format(excel_config_json_path, exc)) raise exc # some os error occured (e.g file not found or malformed path string) # have to catch two exception classes: in py2 : IOError; py3: OSError except (IOError, OSError) as exc: self._plugin.log.error(exc) # raise only OSError to make error handling in caller easier raise OSError() # validate json object if schema file path is there; otherwise throw warning try: with open(JSON_SCHEMA_FILE_PATH) as file_pointer: schema_obj = json.load(file_pointer) # the file is not deserializable as a json object except ValueError as exc: self._plugin.log.error('Malformed JSON schema file: {0} \n {1}'.format(JSON_SCHEMA_FILE_PATH, exc)) raise exc # some os error occured (e.g file not found or malformed path string) # have to catch two exception classes: in py2 : IOError; py3: OSError except (IOError, OSError) as exc: self._plugin.log.error(exc) # raise only OSError to make error handling in caller easier raise OSError() # do the validation try: validate(json_obj, schema_obj) except ValidationError as error: self._plugin.log.error("Validation failed: {0}".format(error.message)) raise except SchemaError: self._plugin.log.error("Invalid schema file: {0}".format(JSON_SCHEMA_FILE_PATH)) raise return json_obj
def loadFiltersFromFile(cls, fname, validate_data): filters = [] cur_fname = fname try: with cls.filter_file_lock[fname]: with open(cur_fname, encoding="utf-8", errors="replace") as f: data = json.load(f) cur_fname = FILTERS_FILE_SCHEMA_FNAME # TODO: move schema loading to main init and store in class with open(cur_fname) as f: schema = json.load(f) # normalize keys and values data = lower_json(data) jsonschema.validate(data, schema) for item in data.get('filters', []): fltr = Filter.fromDict(item) filters.append(fltr) ver = data.get('version', FilterVersion.V1) if ver != FilterVersion.Latest: FilterManager.convert(ver, filters) last_update = data.get('last_update', '') if validate_data: for fltr in filters: fltr.validate() try: last_update = datetime.strptime(last_update, '%Y-%m-%dT%H:%M:%S.%f') except ValueError: last_update = datetime.utcnow() - timedelta(minutes=FilterManager.UPDATE_INTERVAL) except FileNotFoundError: if cur_fname != FILTERS_FILE_SCHEMA_FNAME: raise else: raise AppException(FILTER_FILE_MISSING.format(cur_fname)) except jsonschema.ValidationError as e: raise AppException(FILTERS_FILE_VALIDATION_ERROR.format(get_verror_msg(e, data))) except jsonschema.SchemaError as e: raise AppException(FILTERS_FILE_SCHEMA_ERROR.format(e.message)) except json.decoder.JSONDecodeError as e: raise AppException(FILTER_INVALID_JSON.format(e, cur_fname)) return filters, last_update