我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用jsonschema.FormatChecker()。
def require_schema(schema): """This decorator verifies that request JSON matches given JSONSchema. http://json-schema.org """ validator = jsonschema.Draft4Validator( schema, format_checker=jsonschema.FormatChecker() ) def outer_decorator(func): @functools.wraps(func) def inner_decorator(self, *args, **kwargs): errors = validator.iter_errors(self.request_json) errors = [err.message for err in errors] if errors: LOG.warning("Cannot validate request: %s", errors) raise exceptions.InvalidJSONError(errors) return func(self, *args, **kwargs) return inner_decorator return outer_decorator
def check_schema(body, schema): """Ensure all necessary keys are present and correct in create body. Check that the user-specified create body is in the expected format and include the required information. :param body: create body :raises InvalidParameterValue: if validation of create body fails. """ validator = jsonschema.Draft4Validator( schema, format_checker=jsonschema.FormatChecker()) try: validator.validate(body) except jsonschema.ValidationError as exc: raise exception.InvalidParameterValue(_('Invalid create body: %s') % exc)
def validate_schema(payload, schema): """Validates the payload against a defined json schema for the requested endpoint. :param payload: incoming request data :type payload: dict :param schema: the schema the request payload should be validated against :type schema: .json file :returns: errors if any :rtype: list """ errors = [] validator = jsonschema.Draft4Validator(schema, format_checker=jsonschema.FormatChecker()) for error in sorted(validator.iter_errors(payload), key=str): errors.append(error.message) return errors
def add(self, message): if isinstance(message, singer.RecordMessage): stream = self.ensure_stream(message.stream) if stream.latest_schema: validator_fn = extend_with_default(Draft4Validator) validator = validator_fn( stream.latest_schema, format_checker=FormatChecker()) validator.validate(copy.deepcopy(message.record)) else: print('I saw a record for stream {} before the schema'.format( message.stream)) exit(1) stream.num_records += 1 elif isinstance(message, singer.SchemaMessage): stream = self.ensure_stream(message.stream) stream.num_schemas += 1 stream.latest_schema = message.schema elif isinstance(message, singer.StateMessage): self.latest_state = message.value self.num_states += 1
def is_valid_response(response, schema): """ Returns True if the response validates with the schema, False otherwise :param response: The response returned by the API :param schema: The schema to validate with :returns: True if the response validates with the schema, False otherwise """ try: validate(response, schema, format_checker=FormatChecker()) return True except Exception as e: log.warning("Response is not valid: %s" % (e,)) return False
def validate(instance, schema, cls=None, *args, **kwargs): """ Calls jsonschema.validate() with the arguments. """ format_checker = FormatChecker() _validate(instance, schema, cls, *args, format_checker=format_checker, **kwargs)
def __init__(self, schema, is_body=True): self.is_body = is_body validators = { 'minimum': self._validate_minimum, 'maximum': self._validate_maximum } validator_cls = jsonschema.validators.extend(self.validator_org, validators) fc = jsonschema.FormatChecker() self.validator = validator_cls(schema, format_checker=fc)
def validate_json(json_data, json_schema=None, json_example=None, validator_cls=None, format_checker=jsonschema.FormatChecker(), on_empty_404=False): # if output is empty, auto return the error 404. if not json_data and on_empty_404: raise JsonError(404, "Not found.") if json_schema is not None: json_data = _to_json(json_data) # We wrap output in an object before validating in case # output is a string (and ergo not a validatable JSON # object) jsonschema.validate( { "result": json_data }, { "type": "object", "properties": { "result": json_schema }, "required": ["result"] }, cls=validator_cls, format_checker=format_checker ) return json_data
def __init__(self, name): self.name = name self.schema = schemas.SCHEMAS.get(name) checker = jsonschema.FormatChecker() self.validator = jsonschema.Draft4Validator(self.schema, format_checker=checker)
def _validate(data, schema): v = Draft4Validator(schema, format_checker=FormatChecker()) errors = sorted(v.iter_errors(data), key=lambda e: e.path) return errors
def validate(self, json, schema_name): """ ???? :param json: type(str) ??json :param schema_name: type(str) schema?? :return: (boolean, dict) ???????? (True, );??????? (False, err) err = { "key_name":"", "key_path":"", "msg": "" } :type json: dict :type schema_name: str :rtype (bool, str) """ err = { "key_name": "", "key_path": "", "msg": "" } try: jsonschema.validate(json, self.schema[schema_name], format_checker=jsonschema.FormatChecker()) return True, err except ValidationError, e: key_name = "" if e.path: key_name = e.path.pop().replace(".", "") key_path = str(list(e.path)).replace("[", "").replace("]", "").replace(", ", ".").replace("'", "") msg = e.message err['key_name'] = key_name err['key_path'] = key_path err['msg'] = msg return False, err except SchemaError: raise SchemaError("invalid schema in validator[%s], please checking" % schema_name) except KeyError: raise ValueError("not found schema name [%s] in schema config" % schema_name)
def validate(payload, schema): """Validate `payload` against `schema`, returning an error list. jsonschema provides lots of information in it's errors, but it can be a bit of work to extract all the information. """ v = jsonschema.Draft4Validator( schema, format_checker=jsonschema.FormatChecker()) error_list = [] for error in v.iter_errors(payload): message = error.message location = '/' + '/'.join([str(c) for c in error.absolute_path]) error_list.append(message + ' at ' + location) return error_list
def __init__(self, schema, relax_additional_properties=False): validators = { 'minimum': self._validate_minimum, 'maximum': self._validate_maximum, } if relax_additional_properties: validators[ 'additionalProperties'] = _soft_validate_additional_properties validator_cls = jsonschema.validators.extend(self.validator_org, validators) format_checker = FormatChecker() self.validator = validator_cls(schema, format_checker=format_checker)
def handle_batch(self, messages, schema, key_names, bookmark_names=None): # pylint: disable=no-self-use,unused-argument '''Handles messages by validating them against schema.''' schema = float_to_decimal(schema) validator = Draft4Validator(schema, format_checker=FormatChecker()) for i, message in enumerate(messages): if isinstance(message, singer.RecordMessage): data = float_to_decimal(message.record) validator.validate(data) if key_names: for k in key_names: if k not in data: raise TargetStitchException( 'Message {} is missing key property {}'.format( i, k)) LOGGER.info('Batch is valid')
def test_fetch_license(self): """ Fetch a license from the URL found in the status doc, then validates it """ try: license = next((l for l in self.lsd['links'] if l['rel'] == 'license')) except StopIteration as err: raise TestSuiteRunningError( "Missing a 'license' link in the status document") license_url = license['href'] LOGGER.debug("Fetch license at url %s:", license_url) # fetch the license r = requests.get(license_url) try: self.lcpl = r.json() except ValueError as err: LOGGER.debug(r.text) raise TestSuiteRunningError("Malformed JSON License Document") LOGGER.debug("The License is available") # validate the license lcpl_json_schema_path = os.path.join( JSON_SCHEMA_DIR_PATH, 'lcpl_schema.json') with open(lcpl_json_schema_path) as schema_file: lcpl_json_schema = json.loads(schema_file.read()) try: jsonschema.validate(self.lcpl, lcpl_json_schema, format_checker=jsonschema.FormatChecker()) except jsonschema.ValidationError as err: raise TestSuiteRunningError(err) LOGGER.debug("The up to date License is available and valid") # display some license values LOGGER.info("date issued {}, updated {}".format( self.lcpl['issued'], self.lcpl['updated'] if "updated" in self.lcpl else "never")) LOGGER.info("rights print {}, copy {}".format( self.lcpl['rights']['print'], self.lcpl['rights']['copy'])) LOGGER.info("rights start {}, end {}".format( self.lcpl['rights']['start'] if "start" in self.lcpl['rights'] else "none", self.lcpl['rights']['end'] if "end" in self.lcpl['rights'] else "none"))