我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用jsonschema.ValidationError()。
def validate(self, *args, **kwargs): try: self.validator.validate(*args, **kwargs) except jsonschema.ValidationError as ex: if len(ex.path) > 0: if self.is_body: detail = _("Invalid input for field '%(path)s'." "Value: '%(value)s'. %(message)s") else: detail = _("Invalid input for query parameters " "'%(path)s'. Value: '%(value)s'. %(message)s") detail = detail % { 'path': ex.path.pop(), 'value': ex.instance, 'message': six.text_type(ex) } else: detail = six.text_type(ex) raise exception.SchemaValidationError(detail=detail)
def to_table_data(self): """ :raises ValueError: :raises pytablereader.error.ValidationError: """ self._validate_source_data() attr_name_set = set() for json_record in self._buffer: attr_name_set = attr_name_set.union(six.viewkeys(json_record)) self._loader.inc_table_count() yield TableData( table_name=self._make_table_name(), header_list=sorted(attr_name_set), record_list=self._buffer, quoting_flags=self._loader.quoting_flags)
def to_table_data(self): """ :raises ValueError: :raises pytablereader.error.ValidationError: """ self._validate_source_data() self._loader.inc_table_count() header_list = sorted(six.viewkeys(self._buffer)) yield TableData( table_name=self._make_table_name(), header_list=header_list, record_list=zip( *[self._buffer.get(header) for header in header_list]), quoting_flags=self._loader.quoting_flags)
def to_table_data(self): """ :raises ValueError: :raises pytablereader.error.ValidationError: """ self._validate_source_data() for table_key, json_record_list in six.iteritems(self._buffer): attr_name_set = set() for json_record in json_record_list: attr_name_set = attr_name_set.union(six.viewkeys(json_record)) self._loader.inc_table_count() self._table_key = table_key yield TableData( table_name=self._make_table_name(), header_list=sorted(attr_name_set), record_list=json_record_list, quoting_flags=self._loader.quoting_flags)
def to_table_data(self): """ :raises ValueError: :raises pytablereader.error.ValidationError: """ self._validate_source_data() for table_key, json_record_list in six.iteritems(self._buffer): header_list = sorted(six.viewkeys(json_record_list)) self._loader.inc_table_count() self._table_key = table_key yield TableData( table_name=self._make_table_name(), header_list=header_list, record_list=zip( *[json_record_list.get(header) for header in header_list]), quoting_flags=self._loader.quoting_flags)
def __init__(self, path=None): self.path = path or os.getenv('WEBHDFS_CONFIG', self.default_path) if osp.exists(self.path): try: self.config = json.loads(open(self.path).read()) self.schema = json.loads(resource_string(__name__, 'resources/config_schema.json')) #self.schema = open("resources/schema.config").read() try: js.validate(self.config, self.schema) except js.ValidationError as e: print e.message except js.SchemaError as e: print e except ParsingError: raise HdfsError('Invalid configuration file %r.', self.path) _logger.info('Instantiated configuration from %r.', self.path) else: raise HdfsError('Invalid configuration file %r.', self.path)
def schema_request(media_type_name): def wrapper(fn): @wraps(fn) def decorated(*args, **kwargs): if isinstance(media_type_name, basestring): schema = get_schema_for_media_type(media_type_name) else: schema = media_type_name json_object = request.get_json() try: validate(json_object, schema) except ValidationError as e: report = generate_validation_error_report(e, json_object) abort(400, description=report) return fn(*args, **kwargs) return decorated return wrapper
def validate_service_definitions(components_map, components=None): if not components: components = components_map.keys() else: validation_base.validate_components_names(components, components_map) not_passed_components = set() for component in components: try: jsonschema.validate(components_map[component]["service_content"], SERVICE_SCHEMA, format_checker=ServiceFormatChecker()) except jsonschema.ValidationError as e: LOG.error("Validation of service definitions for component '%s' " "is not passed: '%s'", component, e.message) not_passed_components.add(component) if not_passed_components: raise RuntimeError( "Validation of service definitions for {} of {} components is " "not passed.".format(len(not_passed_components), len(components)) ) else: LOG.info("Service definitions validation passed successfully")
def __call__(self, environ, start_response): code = self.code content = self.content request = Request(environ) self.requests.append(request) data = request.data if request.content_encoding == 'deflate': data = zlib.decompress(data) data = data.decode(request.charset) if request.content_type == 'application/json': data = json.loads(data) self.payloads.append(data) validator = VALIDATORS.get(request.path, None) if validator and not self.skip_validate: try: validator.validate(data) code = 202 except jsonschema.ValidationError as e: code = 400 content = json.dumps({'status': 'error', 'message': str(e)}) response = Response(status=code) response.headers.clear() response.headers.extend(self.headers) response.data = content return response(environ, start_response)
def validate_config(fname): """ Validate configuration file in json format. """ # Load schema schema_fname = pkg_resources.resource_filename('export2hdf5', "config_schema.json") schema = load_json_file(schema_fname) config = load_json_file(fname) res = None try: jsonschema.validate(config, schema) except jsonschema.ValidationError as e: res = e.message except jsonschema.SchemaError as e: res = e return res
def execute_plan(self, name, planner_config, attack_config): """ Execute a plan with a planner and executor config to create executors based on the configs It also validates the planner and executor config against the modules :param name: Plan name :param planner_config: Dict with planner config :param attack_config: Dict with attack config """ try: planner_class = self._planners_store.get(planner_config.get("ref")) attack_class = self._attacks_store.get(attack_config.get("ref")) except ModuleLookupError as e: raise APIError("invalid planner %s" % e.message) # Validate both executor and planner configs try: validate(planner_config.get("args"), planner_class.schema) validate(attack_config, attack_class.schema) except ValidationError as e: raise APIError("invalid payload %s" % e.message) planner = planner_class(name) planner.plan(planner_config, attack_config)
def validate_payload(request, schema): """ validates a request payload against a json schema :param request: request received with valid json body :param schema: schema to validate the request payload :return: True :raises: :meth:`chaosmonkey.api.api_errors` """ try: json = request.get_json() validate(json, schema) except ValidationError as e: raise APIError("invalid payload %s" % e.message) except Exception: raise APIError("payload must be a valid json") else: return True
def validate(install_json): """Validate install.json file for required parameters""" # install.json validation try: with open(install_json) as fh: data = json.loads(fh.read()) validate(data, schema) print('{} is valid'.format(install_json)) except SchemaError as e: print('{} is invalid "{}"'.format(install_json, e)) except ValidationError as e: print('{} is invalid "{}"'.format(install_json, e)) # @staticmethod # def _wrap(data): # """Wrap any parameters that contain spaces # # Returns: # (string): String containing parameters wrapped in double quotes # """ # if len(re.findall(r'[!\-\s\$]{1,}', data)) > 0: # data = '"{}"'.format(data) # return data
def check_schema(document): if type(document) != dict: LOG.error('Non-dictionary document passed to schema validation.') return schema_name = document.get('schema', '<missing>') LOG.debug('Validating schema for schema=%s metadata.name=%s', schema_name, document.get('metadata', {}).get('name', '<missing>')) if schema_name in SCHEMAS: try: jsonschema.validate(document.get('data'), SCHEMAS[schema_name]) except jsonschema.ValidationError as e: raise exceptions.ValidationException(str(e)) else: LOG.warning('Skipping validation for unknown schema: %s', schema_name)
def _validate_schema(schema, body): """Validate data against a schema""" # Note # # Schema validation is currently the major CPU bottleneck of # BigchainDB. the `jsonschema` library validates python data structures # directly and produces nice error messages, but validation takes 4+ ms # per transaction which is pretty slow. The rapidjson library validates # much faster at 1.5ms, however it produces _very_ poor error messages. # For this reason we use both, rapidjson as an optimistic pathway and # jsonschema as a fallback in case there is a failure, so we can produce # a helpful error message. try: schema[1].validate(rapidjson.dumps(body)) except ValueError as exc: try: jsonschema.validate(body, schema[0]) except jsonschema.ValidationError as exc2: raise SchemaValidationError(str(exc2)) from exc2 logger.warning('code problem: jsonschema did not raise an exception, wheras rapidjson raised %s', exc) raise SchemaValidationError(str(exc)) from exc
def validate(self, data): name = data.get('name') config = data.get('config', {}) if not name or name not in plugins: raise serializers.ValidationError('Invalid plugin name') plugin_schema = plugins[name] if self.instance: initial_config = self.instance.config initial_config.update(config) config = initial_config try: jsonschema.validate(config, plugin_schema) except jsonschema.ValidationError as e: raise serializers.ValidationError({'config': e}) plugin_validators = validators.validator_classes.get(name, []) for validator in plugin_validators: validate = validator(data['config']) validate() return data
def test_unsuccessful_validation(self): error = ValidationError("I am an error!", instance=1) stdout, stderr = StringIO(), StringIO() exit_code = cli.run( { "validator": fake_validator([error]), "schema": {}, "instances": [1], "error_format": "{error.instance} - {error.message}", }, stdout=stdout, stderr=stderr, ) self.assertFalse(stdout.getvalue()) self.assertEqual(stderr.getvalue(), "1 - I am an error!") self.assertEqual(exit_code, 1)
def test_unsuccessful_validation_multiple_instances(self): first_errors = [ ValidationError("9", instance=1), ValidationError("8", instance=1), ] second_errors = [ValidationError("7", instance=2)] stdout, stderr = StringIO(), StringIO() exit_code = cli.run( { "validator": fake_validator(first_errors, second_errors), "schema": {}, "instances": [1, 2], "error_format": "{error.instance} - {error.message}\t", }, stdout=stdout, stderr=stderr, ) self.assertFalse(stdout.getvalue()) self.assertEqual(stderr.getvalue(), "1 - 9\t1 - 8\t2 - 7\t") self.assertEqual(exit_code, 1)
def put(self, section_name): if not self.settings_dir: raise web.HTTPError(404, "No current settings directory") raw = self.request.body.strip().decode(u"utf-8") # Validate the data against the schema. schema = _get_schema(self.schemas_dir, section_name, self.overrides) validator = Validator(schema) try: validator.validate(json.loads(json_minify(raw))) except ValidationError as e: raise web.HTTPError(400, str(e)) # Write the raw data (comments included) to a file. path = _path(self.settings_dir, section_name, _file_extension, True) with open(path, "w") as fid: fid.write(raw) self.set_status(204)
def test_it_delegates_to_a_legacy_ref_resolver(self): """ Legacy RefResolvers support only the context manager form of resolution. """ class LegacyRefResolver(object): @contextmanager def resolving(this, ref): self.assertEqual(ref, "the ref") yield {"type" : "integer"} resolver = LegacyRefResolver() schema = {"$ref" : "the ref"} with self.assertRaises(ValidationError): self.validator_class(schema, resolver=resolver).validate(None)
def test_validate_lcpl(self): """ Validate an LCP license file """ # validate the license using the JSON schema, includes: # check the profile Value (basic or 1.0) # check the encryption method (aes-cbc), user key (sha256) and signature algorithm (ecdsa-sha256) lcpl_json_schema_path = os.path.join( JSON_SCHEMA_DIR_PATH, 'lcpl_schema.json') with open(lcpl_json_schema_path) as schema_file: lcpl_json_schema = json.loads(schema_file.read()) try: jsonschema.validate(self.lcpl, lcpl_json_schema) except jsonschema.ValidationError as err: raise TestSuiteRunningError(err)
def validate_schema(self): """ Method to validate the JSON data against the schema Args: Returns: str: An error message if schema validation fails """ if not self.schema: raise ValueError("Schema has not been populated yet. Cannot validate.") try: jsonschema.validate(self.config, self.schema) except jsonschema.ValidationError as e: return e return None
def validate(evt, evttype): """Validates the content of an event against the JSON schema of its type. Args: evt (:obj:`baroque.entities.event.Event`): the event to be validated evttype (:obj:`baroque.entities.eventtype.EventType`): the type of the event that needs to be validated Returns: ``True`` if validation is OK, ``False`` otherwise """ try: assert evt.type == evttype check(loads(evt.json()), loads(evttype.jsonschema)) return True except (AssertionError, ValidationError): return False
def consume(self, value): if isinstance(value, list): value = {item["id"]: item["value"] for item in value} try: self.validator.validate(value) except jsonschema.ValidationError as exc: LOG.warning("Cannot validate hints: %s", exc) raise ValueError("Cannot validate hints") from exc values = {} for key, schema_value in self.schema.items(): default_value = schema_value.get("default_value", None) values[key] = value.get(key, default_value) return values
def generate_swagger(self): """ conversion of the raml info into swagger :return: """ try: parse_tree = ramlparser.load(self.inputname) except ValidationError as e: print ('validation error:', e.errors) print ("could not load file: error loading file") traceback.print_exc() return title = self.get_first_display_name(parse_tree) version = parse_tree.version self.swag_openfile(version, title) self.swag_add_resource(parse_tree) self.swag_add_generic_parameters(parse_tree) self.swag_add_definitions(parse_tree) self.swag_closefile() print ("swagger document saved..", self.swagger) self.swag_verify()
def check_schema(body, schema): """Ensure all necessary keys are present and correct in create body. Check that the user-specified create body is in the expected format and include the required information. :param body: create body :raises InvalidParameterValue: if validation of create body fails. """ validator = jsonschema.Draft4Validator( schema, format_checker=jsonschema.FormatChecker()) try: validator.validate(body) except jsonschema.ValidationError as exc: raise exception.InvalidParameterValue(_('Invalid create body: %s') % exc)
def validate(schema): def decorator(func): def wrapper(self, req, resp, *args, **kwargs): try: raw_json = req.stream.read() obj = json.loads(raw_json.decode('utf-8')) except Exception: raise falcon.HTTPBadRequest( 'Invalid data', 'Could not properly parse the provided data as JSON' ) try: jsonschema.validate(obj, schema) except jsonschema.ValidationError as e: raise falcon.HTTPBadRequest( 'Failed data validation', e.message ) return func(self, req, resp, *args, parsed=obj, **kwargs) return wrapper return decorator
def validate(self, descriptor, schema_id): """ Validate a descriptor against a schema template :param descriptor: :param schema_id: :return: """ try: jsonschema.validate(descriptor, self.load_schema(schema_id)) return True except ValidationError as e: log.error("Failed to validate Descriptor against schema '{}'" .format(schema_id)) self.error_msg = e.message log.error(e.message) return except SchemaError as e: log.error("Invalid Schema '{}'".format(schema_id)) self.error_msg = e.message log.debug(e) return
def resolve_dependencies(args): """Main function to handle the dependencies from the config file. The dependencies will be downloaded from the specified S3 repository and will be placed in the location provided for the specific dependency from the configuration file. :param args: The user's arguments supplied from the main function. """ try: config_file = args.config with open(config_file, 'r') as json_file: dependencies_data = json.load(json_file) validate_schema(dependencies_data) validate_data(dependencies_data) download_dependencies(dependencies_data) sys.exit() except IOError: handle_exception('File could not be opened') except ValueError: handle_exception('File could not be parsed') except ValidationError: handle_exception('File could not be validated') except Exception: handle_exception('Unknown error occurred')
def is_valid(self, data: Dict, schema_name: str) -> bool: """Validates data with JSON schemes and returns result. Args: data: The data. schema_name: The name of the schemes used for validation. Returns: True if data is valid, False if not. """ if not self.has_schema(schema_name): self.logger.warning('JSON schemes "{}" not found' .format(schema_name)) return False try: schema = self._schema.get(schema_name) jsonschema.validate(data, schema) except jsonschema.ValidationError: return False return True
def custom_check_card(self, card): validations = [] #check foreing codes for collection in ["affiliation", "faction", "rarity", "type", "subtype"]: field = collection + "_code" if field in card and not card.get(field) in self.collections[collection]: validations.append("%s code '%s' does not exists in card '%s'" % (collection, card.get(field), card.get('code'))) #check reprint of if 'reprint_of' in card and not card.get('reprint_of') in self.collections['card']: validations.append("Reprinted card %s does not exists" % (card.get('reprint_of'))) #checks by type check_by_type_method = "custom_check_%s_card" % card.get('type_code') if hasattr(self, check_by_type_method) and callable(getattr(self, check_by_type_method)): validations.extend(getattr(self, check_by_type_method)(card)) if validations: raise jsonschema.ValidationError("\n".join(["- %s" % v for v in validations]))
def validate(self, *args, **kwargs): try: self.validator.validate(*args, **kwargs) except jsonschema.ValidationError as ex: if isinstance(ex.cause, exception.InvalidName): detail = ex.cause.format_message() elif len(ex.path) > 0: detail = _("Invalid input for field/attribute %(path)s." " Value: %(value)s. %(message)s") % { 'path': ex.path.pop(), 'value': ex.instance, 'message': ex.message } else: detail = ex.message raise exception.ValidationError(detail=detail) except TypeError as ex: # NOTE: If passing non string value to patternProperties parameter, # TypeError happens. Here is for catching the TypeError. detail = six.text_type(ex) raise exception.ValidationError(detail=detail)
def test_validate_raises_if_invalid_data(): data = { 'foo': 'bar', } schema = { '$schema': 'http://json-schema.org/schema#', 'type': 'object', 'properties': { 'foo': { 'type': 'integer' } } } with pytest.raises(ValidationError): utils.validate(data, schema)
def _validate_logic(self, doc): doc_name = next(iter(doc)) # ????????, ??? ???? ???? ?? ?????? ????????? ???? doc_timestamp = doc[doc_name].get('dateTime') if self.min_date and self.min_date.timestamp() > doc_timestamp: doc_date = datetime.datetime.fromtimestamp(doc_timestamp) raise ValidationError('Document timestamp ' + str(doc_date) + ' is less than min. allowed date ' + str(self.min_date)) # ????????, ??? ??? ????? ???? "?? ????????" ?????? ?? 24 ???? ?????? UTC future = datetime.datetime.utcnow() + datetime.timedelta(hours=self.future_hours) if doc_timestamp > future.timestamp(): doc_date = datetime.datetime.fromtimestamp(doc_timestamp) raise ValidationError('Document timestamp {} is greater than now for {} hours' .format(str(doc_date), str(self.future_hours)))
def _build_object(cls, value, schema, nested_types, input_): if 'object' in nested_types: raise ModelBaseError('nested object was not allowed', input_=input_) properties = value.split('|') dict_obj = dict() nested_types.add('object') for prop in properties: key, value = prop.split(':') prop_schema = schema['properties'].get(key) if prop_schema is None: raise ValidationError("Invalid property '{}'".format(key), instance=input_, schema=schema) dict_obj[key] = \ cls._build_value(value, prop_schema, nested_types, input_) nested_types.discard('object') return dict_obj
def put_by_uri_template(cls, req, resp): session, req_body, id_, kwargs = cls._get_context_values(req.context) req_body_copy = deepcopy(req_body) cls._update_dict(req_body, id_) objs = cls.update(session, req_body, ids=id_, **kwargs) if not objs: req_body = req_body_copy ambigous_keys = [ kwa for kwa in id_ if kwa in req_body and req_body[kwa] != id_[kwa]] if ambigous_keys: body_schema = req.context.get('body_schema') raise ValidationError( "Ambiguous value for '{}'".format( "', '".join(ambigous_keys)), instance={'body': req_body, 'uri': id_}, schema=body_schema) req.context['parameters']['body'] = req_body cls._insert(req, resp, with_update=True) else: resp.body = json.dumps(objs[0])
def __prepare_json(status, op=None, schema=None, data=None, errors=None): """Prepare json structure for returning.""" ret_json = {"status": status} if errors: if not isinstance(errors, list): ret_json["errors"] = [errors] else: ret_json["errors"] = errors if data: ret_json["data"] = data if op: op_schema = _get_pkg_output_schema(op) try: jsonschema.validate(ret_json, op_schema) except jsonschema.ValidationError as e: newret_json = {"status": EXIT_OOPS, "errors": [{"reason": str(e)}]} return newret_json if schema: ret_json["schema"] = schema return ret_json
def validate(self, *args, **kwargs): try: self.validator.validate(*args, **kwargs) except jsonschema.ValidationError as ex: if isinstance(ex.cause, exception.InvalidName): detail = ex.cause.format_message() elif len(ex.path) > 0: # NOTE: For whole OpenStack message consistency, this error # message has been written as the similar format of WSME. detail = _("Invalid input for field/attribute %(path)s." " Value: %(value)s. %(message)s") % { 'path': ex.path.pop(), 'value': ex.instance, 'message': ex.message } else: detail = ex.message raise exception.ValidationError(detail=detail) except TypeError as ex: # NOTE: If passing non string value to patternProperties parameter, # TypeError happens. Here is for catching the TypeError. detail = six.text_type(ex) raise exception.ValidationError(detail=detail)