我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用jsonschema.validate()。
def validate(self, task, method, http_method, **kwargs): """Validates the vendor method's parameters. This method validates whether the supplied data contains the required information for the driver. :param task: a TaskManager instance. :param method: name of vendor method. :param http_method: HTTP method. :param kwargs: data passed to vendor's method. :raises: InvalidParameterValue if supplied data is not valid. :raises: MissingParameterValue if parameters missing in supplied data. """ try: if 'statistics' in method: self._validate_statistics_methods(method, **kwargs) else: self._validate_policy_methods(method, **kwargs) except json_schema_exc.ValidationError as e: raise exception.InvalidParameterValue(_('Input data validation ' 'error: %s') % e)
def test_pydist(): """Make sure pydist.json exists and validates against our schema.""" # XXX this test may need manual cleanup of older wheels import jsonschema def open_json(filename): return json.loads(open(filename, 'rb').read().decode('utf-8')) pymeta_schema = open_json(resource_filename('wheel.test', 'pydist-schema.json')) valid = 0 for dist in ("simple.dist", "complex-dist"): basedir = pkg_resources.resource_filename('wheel.test', dist) for (dirname, subdirs, filenames) in os.walk(basedir): for filename in filenames: if filename.endswith('.whl'): whl = ZipFile(os.path.join(dirname, filename)) for entry in whl.infolist(): if entry.filename.endswith('/metadata.json'): pymeta = json.loads(whl.read(entry).decode('utf-8')) jsonschema.validate(pymeta, pymeta_schema) valid += 1 assert valid > 0, "No metadata.json found"
def test_pydist(): """Make sure pydist.json exists and validates against our schema.""" # XXX this test may need manual cleanup of older wheels import jsonschema def open_json(filename): with open(filename, 'rb') as json_file: return json.loads(json_file.read().decode('utf-8')) pymeta_schema = open_json(resource_filename('wheel.test', 'pydist-schema.json')) valid = 0 for dist in ("simple.dist", "complex-dist"): basedir = pkg_resources.resource_filename('wheel.test', dist) for (dirname, subdirs, filenames) in os.walk(basedir): for filename in filenames: if filename.endswith('.whl'): whl = ZipFile(os.path.join(dirname, filename)) for entry in whl.infolist(): if entry.filename.endswith('/metadata.json'): pymeta = json.loads(whl.read(entry).decode('utf-8')) jsonschema.validate(pymeta, pymeta_schema) valid += 1 assert valid > 0, "No metadata.json found"
def validate_analysis_result(context, ecosystem, package, version): """Validate results of the analysis.""" res = context.response.json() # make sure analysis has finished assert res['finished_at'] is not None # we want to validate top-level analysis and worker results that have "schema" defined structures_to_validate = [res] for _, worker_result in res['analyses'].items(): # TODO: in future we want to mandate that all workers have their schemas, # so we'll remove the condition if 'schema' in worker_result: structures_to_validate.append(worker_result) for struct in structures_to_validate: schema = requests.get(struct['schema']['url']).json() jsonschema.validate(struct, schema)
def __init__(self, path=None): self.path = path or os.getenv('WEBHDFS_CONFIG', self.default_path) if osp.exists(self.path): try: self.config = json.loads(open(self.path).read()) self.schema = json.loads(resource_string(__name__, 'resources/config_schema.json')) #self.schema = open("resources/schema.config").read() try: js.validate(self.config, self.schema) except js.ValidationError as e: print e.message except js.SchemaError as e: print e except ParsingError: raise HdfsError('Invalid configuration file %r.', self.path) _logger.info('Instantiated configuration from %r.', self.path) else: raise HdfsError('Invalid configuration file %r.', self.path)
def _example_api(request, schema, example): response = None if schema: # If there is a problem with the json data, return a 400. try: data = json.loads(request.body.decode("utf-8")) validate(data, schema) except Exception as e: if hasattr(settings, 'RAMLWRAP_VALIDATION_ERROR_HANDLER') and settings.RAMLWRAP_VALIDATION_ERROR_HANDLER: response = _call_custom_handler(e) else: response = _validation_error_handler(e) if response: return response if not example: return None else: return example
def _is_valid_query(params, expected_params): """ Function to validate get request params. """ # If expected params, check them. If not, pass. if expected_params: for param in expected_params: # If the expected param is in the query. if param in params: for check, rule in expected_params[param].__dict__.items(): if rule is not None: error_message = "QueryParam [%s] failed validation check [%s]:[%s]" % (param, check, rule) if check == "minLength": if len(params.get(param)) < rule: raise ValidationError(error_message) elif check == "maxLength": if len(params.get(param)) > rule: raise ValidationError(error_message) # Isn't in the query but it is required, throw a validation exception. elif expected_params[param].required is True: raise ValidationError("QueryParam [%s] failed validation check [Required]:[True]" % param) # TODO Add more checks here. return True
def test_validate_fullfledged(test_client, loop): app = web.Application(loop=loop) app.router.add_get('/', full_fledged_handler) app.router.add_post('/', full_fledged_handler) app = swaggerify( app, basePath="/", host="127.0.0.1:8080" ) client = await test_client(app) resp = await client.get('/swagger.json') assert resp.status == 200 text = await resp.json() with open("tests/validate_swagger.json", "r") as fp: assert schema_validate(text, json.load(fp)) is None
def validate_input(self, formData): schema = { 'type': 'string', 'pattern': '^#([A-Fa-f0-9]{6})$' } validate(formData, schema) # TODO: this is for rgb triple # def validate_input(self, formData): # schema = { # 'type': 'array', # 'items': { # 'type': 'number', # 'minimum': 0, # 'maximum': 1 # }, # 'minItems': 3, # 'maxItems': 3 # } # validate(formData, schema)
def get_schemas(cls, doc): """Retrieve the relevant schema based on the document's ``schema``. :param dict doc: The document used for finding the correct schema to validate it based on its ``schema``. :returns: A schema to be used by ``jsonschema`` for document validation. :rtype: dict """ cls._register_data_schemas() # FIXME(fmontei): Remove this once all Deckhand tests have been # refactored to account for dynamic schema registeration via # ``DataSchema`` documents. Otherwise most tests will fail. for doc_field in [doc['schema'], doc['metadata']['schema']]: matching_schemas = cls._get_schema_by_property( cls.schema_re, doc_field) if matching_schemas: return matching_schemas return []
def check_schemas(data_root, schemas_dir, verbose=False): schemas = ('category.json', 'video.json') all_file_paths = get_json_files(data_root) error_count = 0 for schema, file_paths in zip(schemas, all_file_paths): schema_path = os.path.join(schemas_dir, schema) with open(schema_path, encoding='UTF-8') as fp: schema_blob = json.load(fp) for file_path in file_paths: with open(file_path, encoding='UTF-8') as fp: blob = json.load(fp) try: jsonschema.validate(blob, schema_blob) except jsonschema.exceptions.ValidationError as e: print(file_path, flush=True) if verbose: print(e, flush=True) error_count += 1 return error_count
def _test_validate(self, schema, expect_failure, input_files, input): """validates input yaml against schema. :param schema: schema yaml file :param expect_failure: should the validation pass or fail. :param input_files: pytest fixture used to access the test input files :param input: test input yaml doc filename""" schema_dir = pkg_resources.resource_filename('drydock_provisioner', 'schemas') schema_filename = os.path.join(schema_dir, schema) schema_file = open(schema_filename, 'r') schema = yaml.safe_load(schema_file) input_file = input_files.join(input) instance_file = open(str(input_file), 'r') instance = yaml.safe_load(instance_file) if expect_failure: with pytest.raises(ValidationError): jsonschema.validate(instance['spec'], schema['data']) else: jsonschema.validate(instance['spec'], schema['data'])
def validate_json_schema(data, schema, name="task"): """Given data and a jsonschema, let's validate it. This happens for tasks and chain of trust artifacts. Args: data (dict): the json to validate. schema (dict): the jsonschema to validate against. name (str, optional): the name of the json, for exception messages. Defaults to "task". Raises: ScriptWorkerTaskException: on failure """ try: jsonschema.validate(data, schema) except jsonschema.exceptions.ValidationError as exc: raise ScriptWorkerTaskException( "Can't validate {} schema!\n{}".format(name, str(exc)), exit_code=STATUSES['malformed-payload'] )
def validate_service_definitions(components_map, components=None): if not components: components = components_map.keys() else: validation_base.validate_components_names(components, components_map) not_passed_components = set() for component in components: try: jsonschema.validate(components_map[component]["service_content"], SERVICE_SCHEMA, format_checker=ServiceFormatChecker()) except jsonschema.ValidationError as e: LOG.error("Validation of service definitions for component '%s' " "is not passed: '%s'", component, e.message) not_passed_components.add(component) if not_passed_components: raise RuntimeError( "Validation of service definitions for {} of {} components is " "not passed.".format(len(not_passed_components), len(components)) ) else: LOG.info("Service definitions validation passed successfully")
def get_config_schema(): schema = { '$schema': 'http://json-schema.org/draft-04/schema#', 'additionalProperties': False, 'properties': { 'debug': {'type': 'boolean'}, 'verbose_level': {'type': 'integer'}, 'log_file': {'anyOf': [{'type': 'null'}, {'type': 'string'}]}, 'default_log_levels': {'type': 'array', 'items': {'type': 'string'}} }, } for module in CONFIG_MODULES: schema['properties'].update(module.SCHEMA) # Don't validate all options used to be added from oslo.log and oslo.config ignore_opts = ['debug', 'verbose', 'log_file'] for name in ignore_opts: schema['properties'][name] = {} # Also for now don't validate sections that used to be in deploy config for name in ['configs', 'secret_configs', 'nodes', 'roles', 'versions']: schema['properties'][name] = {'type': 'object'} return schema
def removeImages(self, imgList): """ Attempt to remove image metadata from the mongo database :param imgList: a list of docker image names :type imgList: a list of strings """ try: for img in imgList: hash = DockerImage.getHashKey(img) imageData = self._load(hash) super(DockerImageModel, self).remove(imageData.getRawData()) except Exception as err: if isinstance(err, DockerImageNotFoundError): logger.exception('Image %r does not exist', img) raise DockerImageNotFoundError( 'The image %s with hash %s does not exist ' 'in the database' % (img, hash), img) else: logger.exception('Could not remove image %r', img) raise DockerImageError( 'Could not delete the image data from the database ' 'invalid image: ' + img + ' ' + str(err), img) # TODO validate the xml of each cli
def validate(self, doc): try: # validate structure of cached data on docker image jsonschema.validate(doc, DockerImageStructure.ImageSchema) # check cli xml is correct # # loc=os.path.dirname(os.path.abspath(__file__))+'/ModuleDescription.xsd' # schemaFile = open(loc) # # schemaData = schemaFile.read() # # schema_doc = etree.parse(StringIO(schemaData)) # schema = etree.XMLSchema(schema_doc) # # for (key, val) in iteritems(doc[DockerImage.cli_dict]): # xml = val[DockerImage.xml] # cli_xml = etree.parse(xml) # schema.assertValid(cli_xml) # return doc except Exception as err: logger.exception('Image metadata failed to validate %r', doc) raise DockerImageError('Image metadata is invalid ' + str(err))
def network_driver_leave(): """Unbinds a Neutron Port to a network interface attached to a container. This function takes the following JSON data and delete the veth pair corresponding to the given info. :: { "NetworkID": string, "EndpointID": string } """ json_data = flask.request.get_json(force=True) LOG.debug("Received JSON data %s for" " /NetworkDriver.Leave", json_data) jsonschema.validate(json_data, schemata.LEAVE_SCHEMA) return flask.jsonify(const.SCHEMA['SUCCESS'])
def _validate(self, session, input_): # disable validation when the operation was did by the engine model if (isinstance(input_, list) and 'objects' in input_[0]) or \ (isinstance(input_, dict) and 'objects' in input_): return strategy_class = self.strategy.get_class() if self.type not in strategy_class.object_types: raise SwaggerItModelError( "Invalid object type '{}'".format(self.type), instance=input_ ) object_schema = strategy_class.configuration_schema['properties'][self.type] if 'definitions' in strategy_class.configuration_schema: object_schema['definitions'] = strategy_class.configuration_schema['definitions'] jsonschema.validate(self.configuration, object_schema)
def validate_config(fname): """ Validate configuration file in json format. """ # Load schema schema_fname = pkg_resources.resource_filename('export2hdf5', "config_schema.json") schema = load_json_file(schema_fname) config = load_json_file(fname) res = None try: jsonschema.validate(config, schema) except jsonschema.ValidationError as e: res = e.message except jsonschema.SchemaError as e: res = e return res
def validate_payload(request, schema): """ validates a request payload against a json schema :param request: request received with valid json body :param schema: schema to validate the request payload :return: True :raises: :meth:`chaosmonkey.api.api_errors` """ try: json = request.get_json() validate(json, schema) except ValidationError as e: raise APIError("invalid payload %s" % e.message) except Exception: raise APIError("payload must be a valid json") else: return True
def validation(schema): """function decorator""" def dec(func): def wrapper(self, *args, **kwargs): try: rawdata = request.data enc = chardet.detect(rawdata) data = rawdata.decode(enc['encoding']) json_input = json.loads(data) jsonschema.validate(json_input, schema) json_input = prepare_input(json_input) except: raise BadRequest('JSON input not valid: {}'.format(format_exc())) return func(self, json_input, *args, **kwargs) return wrapper return dec
def get_config(): """Get cached configuration. :returns: application config :rtype: dict """ global CONF if not CONF: path = os.environ.get("AVAILABILITY_CONF", "/etc/availability/config.json") try: config = json.load(open(path)) logging.info("Config is '%s'" % path) jsonschema.validate(config, CONF_SCHEMA) CONF = config except IOError as exc: logging.warning("Failed to load config from '%s': %s", path, exc) CONF = DEFAULT_CONF except jsonschema.exceptions.ValidationError as exc: logging.error("Configuration file %s is not valid: %s", path, exc) raise return CONF
def validate(install_json): """Validate install.json file for required parameters""" # install.json validation try: with open(install_json) as fh: data = json.loads(fh.read()) validate(data, schema) print('{} is valid'.format(install_json)) except SchemaError as e: print('{} is invalid "{}"'.format(install_json, e)) except ValidationError as e: print('{} is invalid "{}"'.format(install_json, e)) # @staticmethod # def _wrap(data): # """Wrap any parameters that contain spaces # # Returns: # (string): String containing parameters wrapped in double quotes # """ # if len(re.findall(r'[!\-\s\$]{1,}', data)) > 0: # data = '"{}"'.format(data) # return data