我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用jsonschema.RefResolver()。
def _resolve_refs(uri, spec): """Resolve JSON references in a given dictionary. OpenAPI spec may contain JSON references to its nodes or external sources, so any attempt to rely that there's some expected attribute in the spec may fail. So we need to resolve JSON references before we use it (i.e. replace with referenced object). For details see: https://tools.ietf.org/html/draft-pbryan-zyp-json-ref-02 The input spec is modified in-place despite being returned from the function. """ resolver = jsonschema.RefResolver(uri, spec) def _do_resolve(node): if isinstance(node, collections.Mapping) and '$ref' in node: with resolver.resolving(node['$ref']) as resolved: return resolved elif isinstance(node, collections.Mapping): for k, v in node.items(): node[k] = _do_resolve(v) elif isinstance(node, (list, tuple)): for i in range(len(node)): node[i] = _do_resolve(node[i]) return node return _do_resolve(spec)
def load_validator(schema_path, schema): """Create a JSON schema validator for the given schema. Args: schema_path: The filename of the JSON schema. schema: A Python object representation of the same schema. Returns: An instance of Draft4Validator. """ # Get correct prefix based on OS if os.name == 'nt': file_prefix = 'file:///' else: file_prefix = 'file:' resolver = RefResolver(file_prefix + schema_path.replace("\\", "/"), schema) validator = Draft4Validator(schema, resolver=resolver) return validator
def __init__(self, versions, path, skip_unknown=False, min_date='2016.09.01', future_hours=24): """ ????? ??? ????????? ?????????? ?? ??? ?? json-?????. :param versions: ?????????????? ?????? ?????????, ???????? ['1.0', '1.05']. :param path: ???? ?? ??????????, ??????? ???????? ??? ?????????? ?? ???????, ???????? ?? ???????, ????????, ????? ??? ????????? 1.0 ?????? ?????? ? <path>/1.0/ :param skip_unknown: ???? ????? ?????? ?????????? ?? ?????????????? ?????????? ????????? """ self._validators = {} self._skip_unknown = skip_unknown schema_dir = os.path.expanduser(path) schema_dir = os.path.abspath(schema_dir) self.min_date = datetime.datetime.strptime(min_date, '%Y.%m.%d') if min_date else None self.future_hours = future_hours for version in versions: full_path = os.path.join(schema_dir, version, 'document.schema.json') with open(full_path, encoding='utf-8') as fh: schema = json.loads(fh.read()) resolver = jsonschema.RefResolver('file://' + full_path, None) validator = Draft4Validator(schema=schema, resolver=resolver) validator.check_schema(schema) # ?????????, ??? ???? ????? - ???????? self._validators[version] = validator
def resolve_schema_references(schema, refs=None): '''Resolves and replaces json-schema $refs with the appropriate dict. Recursively walks the given schema dict, converting every instance of $ref in a 'properties' structure with a resolved dict. This modifies the input schema and also returns it. Arguments: schema: the schema dict refs: a dict of <string, dict> which forms a store of referenced schemata Returns: schema ''' refs = refs or {} return _resolve_schema_references(schema, RefResolver("", schema, store=refs))
def __init__(self, api): """ Instantiate a new Lepo router. :param api: The OpenAPI definition object. :type api: dict """ self.api = deepcopy(api) self.api.pop('host', None) self.handlers = {} self.resolver = RefResolver('', self.api)
def test_is_valid_call_validate_with_resolver_instance(self, mock_validate): await self.result.is_valid() self.assertTrue(mock_validate.called) resolver = mock_validate.call_args[-1]['resolver'] self.assertIsInstance(resolver, RefResolver) http_handler, https_handler = list(resolver.handlers.values()) self.assertEqual(http_handler, self.result.session_request_json) self.assertEqual(https_handler, self.result.session_request_json)
def resolve(self, ref, document=None): """Resolve a ref within the schema. This is just a convenience method, since RefResolver returns both a URI and the resolved value, and we usually just need the resolved value. :param str ref: URI to resolve. :param dict document: Optional schema in which to resolve the URI. :returns: the portion of the schema that the URI references. :see: :meth:`SchemaRefResolver.resolve` """ _, resolved = self.resolver.resolve(ref, document=document) return resolved
def resolver(self): """jsonschema RefResolver object for the base schema.""" if self._resolver is not None: return self._resolver if self._schema_path is not None: # the documentation for ref resolving # https://github.com/Julian/jsonschema/issues/98 # https://python-jsonschema.readthedocs.org/en/latest/references/ self._resolver = SchemaRefResolver( 'file://' + self._schema_path + '/', self.schema) else: self._resolver = SchemaRefResolver.from_schema(self.schema) return self._resolver
def test_resolver(self): self.assertIsNone(self.schema._resolver) resolver = self.schema.resolver self.assertIsInstance(resolver, jsonschema.RefResolver) self.assertEqual(resolver, self.schema._resolver)
def test_openapi_spec_validity(self): url = self.get_server_url() + '/swagger.json' r = self.login_session().get(url) assert r.status_code == 200 parsed_spec = r.json() assert isinstance(validator20.validate_spec(parsed_spec), RefResolver)
def _validate(self): """Validates self.redfish against self.schema_obj Validates a redfish OrderedDict against the schema object passed on the object creation. Returns: None Exception: ValidationError: Raises this exception on validation failure. OneViewRedfishError: Raises this exception if schema is not found. """ schema_version = util.schemas[self.schema_name] stored_schemas = util.stored_schemas try: schema_obj = stored_schemas[ "http://redfish.dmtf.org/schemas/v1/" + schema_version] except KeyError: raise OneViewRedfishError("{} not found".format(schema_version)) resolver = jsonschema.RefResolver('', schema_obj, store=stored_schemas) jsonschema.validate(self.redfish, schema_obj, resolver=resolver)
def get_schema(self, schema_name): """ Had to remove the id property from map.json or it uses URLs for validation See various issues at https://github.com/Julian/jsonschema/pull/306 """ schema_name += ".json" if schema_name not in self.schemas.keys(): schemas_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), "schemas") schema_file = os.path.join(schemas_folder, schema_name) if not os.path.isfile(schema_file): raise IOError("The file %s does not exist" % schema_file) with open(schema_file) as f: try: jsn_schema = json.loads(f.read()) except ValueError as ex: log.error("Could not load %s", schema_file) raise ex root_schema_path = self.get_schema_path(schemas_folder) resolver = jsonschema.RefResolver(root_schema_path, None) # cache the schema for future use self.schemas[schema_name] = (jsn_schema, resolver) else: jsn_schema, resolver = self.schemas[schema_name] return jsn_schema, resolver
def test_openapi_spec_validity(flask_app_client): raw_openapi_spec = flask_app_client.get('/api/v1/swagger.json').data deserialized_openapi_spec = json.loads(raw_openapi_spec.decode('utf-8')) assert isinstance(validator20.validate_spec(deserialized_openapi_spec), RefResolver)
def test_resolver(self): from jsonschema import RefResolver dirname = os.path.dirname(__file__) schemas_path = 'file://' + os.path.join(dirname, 'schemas/') resolver = RefResolver(schemas_path, None) country_schema_file = \ open(os.path.join(dirname, 'schemas/') + 'country.json') person_schema_file = \ open(os.path.join(dirname, 'schemas/') + 'person.json') country_schema = json.load(country_schema_file) person_schema = json.load(person_schema_file) Country = warlock.model_factory(country_schema, resolver) Person = warlock.model_factory(person_schema, resolver) england = Country( name="England", population=53865800, overlord=Person( title="Queen", firstname="Elizabeth", lastname="Windsor" ) ) expected = { 'name': 'England', 'population': 53865800, 'overlord': { 'title': 'Queen', 'lastname': 'Windsor', 'firstname': 'Elizabeth' } } self.assertEqual(england, expected)
def run(filename, schema, document): resolver = jsonschema.RefResolver( 'file://{0}'.format(filename), schema, store={schema['id']: schema}) jsonschema.validate(document, schema, resolver=resolver)
def resolve(self, ref, document=None): """Resolve a fragment within the schema. If the resolved value contains a $ref, it will attempt to resolve that as well, until it gets something that is not a reference. Circular references will raise a SchemaError. :param str ref: URI to resolve. :param dict document: Optional schema in which to resolve the URI. :returns: a tuple of the final, resolved URI (after any recursion) and resolved value in the schema that the URI references. """ try: # This logic is basically the RefResolver's resolve function, but # updated to support fragments of dynamic documents. The jsonschema # module supports passing documents when resolving fragments, but # it doesn't expose that capability in the resolve function. url = self._urljoin_cache(self.resolution_scope, ref) if document is None: # No document passed, so just resolve it as we normally would. resolved = self._remote_cache(url) else: # Document passed, so assume it's a fragment. _, fragment = urldefrag(url) resolved = self.resolve_fragment(document, fragment) except jsonschema.RefResolutionError as e: # Failed to find a ref. Make the error a bit prettier so we can # figure out where it came from. message = e.args[0] if self._scopes_stack: message = '{} (from {})'.format( message, self._format_stack(self._scopes_stack)) raise SchemaError(message) if isinstance(resolved, dict) and '$ref' in resolved: # Try to resolve the reference, so we can get the actual value we # want, instead of a useless dict with a $ref in it. if url in self._scopes_stack: # We've already tried to look up this URL, so this must # be a circular reference in the schema. raise SchemaError( 'Circular reference in schema: {}'.format( self._format_stack(self._scopes_stack + [url]))) try: self.push_scope(url) return self.resolve(resolved['$ref']) finally: self.pop_scope() else: return url, resolved