我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用voluptuous.Invalid()。
def load_config(path='.boss.yml'): loader = j.FileSystemLoader('.') try: template = loader.load(j.Environment(), path, os.environ) yml = template.render() doc = yaml.load(yml) return transform_config(doc) except j.TemplateNotFound: error = 'Error loading {}: not found'.format(path) raise ConfigurationError(error) except j.TemplateSyntaxError as e: error = 'Error loading {}: {}, line {}'.format(path, e, e.lineno) raise ConfigurationError(error) except IOError as e: error = 'Error loading {}: {}'.format(path, e.strerror) raise ConfigurationError(error) except v.Invalid as e: error = 'Error validating {}: {}'.format(path, e) raise ConfigurationError(error)
def test_isfile(): """Validate that the value is an existing file.""" schema = vol.Schema(cv.isfile) fake_file = 'this-file-does-not.exist' assert not os.path.isfile(fake_file) for value in ('invalid', None, -1, 0, 80000, fake_file): with pytest.raises(vol.Invalid): schema(value) # patching methods that allow us to fake a file existing # with write access with patch('os.path.isfile', Mock(return_value=True)), \ patch('os.access', Mock(return_value=True)): schema('test.txt')
def match_all(value): """Validator that matches all values.""" return value # def platform_validator(domain): # """Validate if platform exists for given domain.""" # def validator(value): # """Test if platform exists.""" # if value is None: # raise vol.Invalid('platform cannot be None') # if get_platform(domain, str(value)): # return value # raise vol.Invalid( # 'platform {} does not exist for {}'.format(value, domain)) # return validator
def __new__(mcs, cls_name, bases, attrs, previous=None, start=False): if bases and TestServer in bases: for attr_name in mcs._class_clients: attrs.setdefault(attr_name, []) validate_attributes = mcs._build_attributes_validator() try: attrs = validate_attributes(attrs) except Invalid as error: msg = "{} @ {}.{}".format(error.error_message, cls_name, error.path[0]) msg += ''.join('[{!r}]'.format(element) for element in error.path[1:]) raise AttributeError(msg) return MetaTestState.__new__(mcs, cls_name, bases, attrs, previous, start)
def valid_adapter_response(base_name, func_name, data): """ Valid that given data match described schema :param str base_name: The name of the asbtract :param str func_name: The name of the called function :parma raw data: The data to valid :raises InvalidFormatError: if data is not compliant """ if not data: return try: Schemas[base_name][func_name](data) except (KeyError, TypeError, ValueError): raise SchemaNotFound('Schema not found for %s.%s' % (base_name, func_name)) except (Invalid, MultipleInvalid) as ex: raise InvalidFormatError('Given data is not compliant to %s.%s schema: %s' % (base_name, func_name, str(ex)))
def validate_body(schema_desc): """ Validate json body """ def real_decorator(func): @wraps(func) def wrapper(*args, **kwargs): try: body = request.get_json() if not Schemas.get(func.__name__): Schemas[func.__name__] = Schema(schema_desc, required=True) Logger.debug(unicode('registering schema for %s' % (func.__name__))) Schemas[func.__name__](body) except (Invalid, MultipleInvalid) as ex: Logger.error(unicode(ex)) msg = 'Missing or invalid field(s) in body, expecting {}'.format(schema_desc) raise BadRequest(msg) return func(*args, **kwargs) return wrapper return real_decorator
def post(self): try: body = json.loads(self.request.body) web_service_model.banana_model(body) type_table = self._monanas.compute_type_table(body["content"]) self.write(type_table) except (AttributeError, voluptuous.Invalid, ValueError) as e: logger.warn("Wrong request: {}.". format(e)) self.set_status(400, "The request body was malformed.") except Exception as e: tb = traceback.format_exc() print(tb) logger.error("Unexpected error: {}. {}". format(sys.exc_info()[0], e)) self.set_status(500, "Internal server error.") self.flush() self.finish()
def validate_links(links): """Validate links to make sure, nothing is missing :type links: dict :param links: connection links to validate :raises: SchemaError -- if any link is missing """ missing = set([]) all_keys = set(links.keys()) for connections in links.values(): for component in connections: if component not in all_keys: missing.add(component.id()) if len(missing) > 0: raise voluptuous.Invalid([ "In connections section, the following components are not " "connected\n\t{}\n" "please modify the configuration so that their list of " "connections is at least '[]'".format(", ".join(missing))], [])
def _validate_existing_id(config, component_id): """Check that the id passed as parameter is defined in the configuration :type config: dict :param config: configuration model for the whole system :type component_id: str :param component_id: component ID to be found in configuration """ found_id = False for comp_type in valid_connection_types.keys(): if component_id in config[comp_type].keys(): found_id = True if not found_id: raise voluptuous.Invalid([ 'In "connections", component `{}` hasn\'t been defined' .format(component_id) ], [])
def get_pagination_options(params, default): try: opts = voluptuous.Schema({ voluptuous.Required( "limit", default=pecan.request.conf.api.max_limit): voluptuous.All(voluptuous.Coerce(int), voluptuous.Range(min=1), voluptuous.Clamp( min=1, max=pecan.request.conf.api.max_limit)), "marker": six.text_type, voluptuous.Required("sort", default=default): voluptuous.All( voluptuous.Coerce(arg_to_list), [six.text_type]), }, extra=voluptuous.REMOVE_EXTRA)(params) except voluptuous.Invalid as e: abort(400, {"cause": "Argument value error", "reason": str(e)}) opts['sorts'] = opts['sort'] del opts['sort'] return opts
def post(self): schema = pecan.request.indexer.get_resource_type_schema() body = deserialize_and_validate(schema) body["state"] = "creating" try: rt = schema.resource_type_from_dict(**body) except resource_type.InvalidResourceAttribute as e: abort(400, "Invalid input: %s" % e) enforce("create resource type", body) try: rt = pecan.request.indexer.create_resource_type(rt) except indexer.ResourceTypeAlreadyExists as e: abort(409, six.text_type(e)) set_resp_location_hdr("/resource_type/" + rt.name) pecan.response.status = 201 return rt
def test_register_operation_wraps_protectron_unmatching_schema(): @register_operation @protectron(Schema({'a': int})) def mirror2(a): return a event = { 'operation': 'mirror2', 'args': { 'a': '42' } } with pytest.raises(Invalid): assert dispatch_event(event) == 42
def Alphanumeric(msg=None): ''' Checks whether a value is: - int, or - long, or - float without a fractional part, or - str or unicode composed only of alphanumeric characters ''' def fn(value): if not any([ isinstance(value, numbers.Integral), (isinstance(value, float) and value.is_integer()), (isinstance(value, basestring) and value.isalnum()) ]): raise Invalid(msg or ( 'Invalid input <{0}>; expected an integer'.format(value)) ) else: return value return fn
def StrictlyAlphanumeric(msg=None): ''' Checks whether a value is: - str or unicode, and - composed of both alphabets and digits ''' def fn(value): if not ( isinstance(value, basestring) and value.isalnum() and not value.isdigit() and not value.isalpha() ): raise Invalid(msg or ( 'Invalid input <{0}>; expected an integer'.format(value)) ) else: return value return fn
def __call__(self, value): ''' Checks whether a value is list of given validation_function. Returns list of validated values or just one valid value in list if there is only one element in given CSV string. ''' try: if isinstance(value, basestring): if self.separator in value: seperated_string_values =[item.strip() for item in value.split(self.separator)] values = [self.validation_function(item) for item in seperated_string_values] else: values = [self.validation_function(value)] return values else: raise ValueError except (Invalid, ValueError) as e: raise Invalid(self.msg or ('<{0}> is not valid set of <{1}>, {2}'.format( value, self.value_type, e)))
def test_update_active_instance_entity_with_bad_payload(self): self.entity_ctl.update_active_instance_entity.side_effect = ValueError( 'Expecting object: line 1 column 15 (char 14)' ) instance_id = 'INSTANCE_ID' data = { 'flavor': 'A_FLAVOR', } code, result = self.api_put('/v1/entity/instance/INSTANCE_ID', data=data, headers={'X-Auth-Token': 'some token value'}) self.entity_ctl.update_active_instance_entity.assert_called_once_with(instance_id=instance_id, **data) self.assertIn("error", result) self.assertEqual(result["error"], 'Invalid parameter or payload') self.assertEqual(code, 400)
def test_update_active_instance_entity_with_wrong_attribute_raise_exception(self): errors = [ Invalid(message="error message1", path=["my_attribute1"]), Invalid(message="error message2", path=["my_attribute2"]), ] self.entity_ctl.update_active_instance_entity.side_effect = exception.InvalidAttributeException(errors) formatted_errors = { "my_attribute1": "error message1", "my_attribute2": "error message2", } instance_id = 'INSTANCE_ID' data = { 'flavor': 'A_FLAVOR', } code, result = self.api_put('/v1/entity/instance/INSTANCE_ID', data=data, headers={'X-Auth-Token': 'some token value'}) self.entity_ctl.update_active_instance_entity.assert_called_once_with(instance_id=instance_id, **data) self.assertIn("error", result) self.assertEqual(result['error'], formatted_errors) self.assertEqual(code, 400)
def valid_format_string(valid_fields): """ Ensure that the provided string can be parsed as a python format string, and contains only `valid_fields` :param valid_fields: set or sequence of valid field names """ f = Formatter() valid_fields = set(valid_fields) def validate_string(format_string): fields = set(field_name for _, field_name, _, _ in f.parse(format_string) if field_name) if fields < valid_fields: return format_string else: raise Invalid('format string specifies invalid field(s): %s' % (fields - valid_fields)) return validate_string
def test_socket_timeout(): """Test socket timeout validator.""" TEST_CONF_TIMEOUT = 'timeout' schema = vol.Schema( {vol.Required(TEST_CONF_TIMEOUT, default=None): cv.socket_timeout}) with pytest.raises(vol.Invalid): schema({TEST_CONF_TIMEOUT: 0.0}) with pytest.raises(vol.Invalid): schema({TEST_CONF_TIMEOUT: -1}) assert _GLOBAL_DEFAULT_TIMEOUT == schema({TEST_CONF_TIMEOUT: None})[TEST_CONF_TIMEOUT] assert 1.0 == schema({TEST_CONF_TIMEOUT: 1})[TEST_CONF_TIMEOUT]
def test_config_error(self): """Test for configuration errors.""" with self.assertRaises(vol.Invalid): unifi.PLATFORM_SCHEMA({ # no username CONF_PLATFORM: unifi.DOMAIN, CONF_HOST: 'myhost', 'port': 123, }) with self.assertRaises(vol.Invalid): unifi.PLATFORM_SCHEMA({ CONF_PLATFORM: unifi.DOMAIN, CONF_USERNAME: 'foo', CONF_PASSWORD: 'password', CONF_HOST: 'myhost', 'port': 'foo', # bad port! })
def socket_timeout(value): """Validate timeout float > 0.0. None coerced to socket._GLOBAL_DEFAULT_TIMEOUT bare object. """ if value is None: return _GLOBAL_DEFAULT_TIMEOUT else: try: float_value = float(value) if float_value > 0.0: return float_value raise vol.Invalid('Invalid socket timeout value.' ' float > 0.0 required.') except Exception as _: raise vol.Invalid('Invalid socket timeout: {err}'.format(err=_)) # pylint: disable=no-value-for-parameter
def post(self, request): """Accept the POST request for push registrations from a browser.""" try: data = yield from request.json() except ValueError: return self.json_message('Invalid JSON', HTTP_BAD_REQUEST) try: data = REGISTER_SCHEMA(data) except vol.Invalid as ex: return self.json_message(humanize_error(data, ex), HTTP_BAD_REQUEST) name = ensure_unique_string('unnamed device', self.registrations.keys()) self.registrations[name] = data if not _save_config(self.json_path, self.registrations): return self.json_message('Error saving registration.', HTTP_INTERNAL_SERVER_ERROR) return self.json_message('Push notification subscriber registered.')
def post(self, request): """Handle the POST request for device identification.""" try: req_data = yield from request.json() except ValueError: return self.json_message('Invalid JSON', HTTP_BAD_REQUEST) try: data = IDENTIFY_SCHEMA(req_data) except vol.Invalid as ex: return self.json_message(humanize_error(request.json, ex), HTTP_BAD_REQUEST) name = data.get(ATTR_DEVICE_ID) CONFIG_FILE[ATTR_DEVICES][name] = data if not _save_config(CONFIG_FILE_PATH, CONFIG_FILE): return self.json_message("Error saving device.", HTTP_INTERNAL_SERVER_ERROR) return self.json({"status": "registered"})
def check(*callback_tuples): """ Voluptuous wrapper function to raise our APIException Args: callback_tuples: a callback_tuple should contain (status, msg, callbacks) Returns: Returns a function callback for the Schema """ def v(value): """ Trys to validate the value with the given callbacks. Args: value: the item to validate Raises: APIException with the given error code and msg. Returns: The value if the validation callbacks are satisfied. """ for msg, callbacks in callback_tuples: for callback in callbacks: try: result = callback(value) if not result and type(result) == bool: raise Invalid() except Exception: raise WebException(msg) return value return v
def CNPJ(value): if not validate_cnpj(value): raise Invalid("Invalid CNPJ") return clean_id(value)
def CPF(value): if not validate_cpf(value): raise Invalid("Invalid CPF") return clean_id(value)
def CEP(value): try: format_cep(value) except ValueError as e: raise Invalid(e) return clean_id(value)
def Date(value): fmt = '%d/%m/%Y' try: if isinstance(value, int): value = datetime.date.today() + datetime.timedelta(value) return value.strftime(fmt) except Exception: raise Invalid("Should be an instance of datetime.datetime")
def __init__(self, **kargs): super(EntityBase, self).__init__(kargs) self.validate() try: raise getattr(self, 'error') except AttributeError: pass except Invalid as e: raise MultipleInvalid([e])
def test_invalid_ceps(self): for i in self.invalid_cases: self.assertRaises(Invalid, CEP, i)
def test_invalid_prices(self): for i in self.invalid_cases: self.assertRaises(Invalid, Price, i)
def test_invalid_cnpjs(self): for i in self.invalid_cases: self.assertRaises(Invalid, CNPJ, i)
def test_invalid_emails(self): for i in self.invalid_cases: self.assertRaises(Invalid, Email, i)
def test_invalid_dates(self): for i in self.invalid_cases: self.assertRaises(Invalid, Date, i)
def Price(value): if not (type(value) == int or type(value) == float): raise Invalid("This price is not and integer or a float.") value = float(value) return "%.2f" % value
def test_invalid_cnpjs(self): for i in self.invalid_cases: self.assertRaises(Invalid, CPF, i)
def invalid(kind, item): return v.Invalid('Invalid {}: {}'.format(kind, item))