我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用voluptuous.Schema()。
def test_isfile(): """Validate that the value is an existing file.""" schema = vol.Schema(cv.isfile) fake_file = 'this-file-does-not.exist' assert not os.path.isfile(fake_file) for value in ('invalid', None, -1, 0, 80000, fake_file): with pytest.raises(vol.Invalid): schema(value) # patching methods that allow us to fake a file existing # with write access with patch('os.path.isfile', Mock(return_value=True)), \ patch('os.access', Mock(return_value=True)): schema('test.txt')
def test_entity_ids(): """Test entity ID validation.""" schema = vol.Schema(cv.entity_ids) for value in ( 'invalid_entity', 'sensor.light,sensor_invalid', ['invalid_entity'], ['sensor.light', 'sensor_invalid'], ['sensor.light,sensor_invalid'], ): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ( [], ['sensor.light'], 'sensor.light' ): schema(value) assert schema('sensor.LIGHT, light.kitchen ') == [ 'sensor.light', 'light.kitchen' ]
def test_time_period(): """Test time_period validation.""" schema = vol.Schema(cv.time_period) for value in ( None, '', 'hello:world', '12:', '12:34:56:78', {}, {'wrong_key': -10} ): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ( '8:20', '23:59', '-8:20', '-23:59:59', '-48:00', {'minutes': 5}, 1, '5' ): schema(value) assert timedelta(seconds=180) == schema('180') assert timedelta(hours=23, minutes=59) == schema('23:59') assert -1 * timedelta(hours=1, minutes=15) == schema('-1:15')
def test_socket_timeout(): """Test socket timeout validator.""" TEST_CONF_TIMEOUT = 'timeout' schema = vol.Schema( {vol.Required(TEST_CONF_TIMEOUT, default=None): cv.socket_timeout}) with pytest.raises(vol.Invalid): schema({TEST_CONF_TIMEOUT: 0.0}) with pytest.raises(vol.Invalid): schema({TEST_CONF_TIMEOUT: -1}) assert _GLOBAL_DEFAULT_TIMEOUT == schema({TEST_CONF_TIMEOUT: None})[TEST_CONF_TIMEOUT] assert 1.0 == schema({TEST_CONF_TIMEOUT: 1})[TEST_CONF_TIMEOUT]
def _build_attributes_validator(mcs): """ Returns validator to validate the sub-classes attributes. """ valid_attributes = { Required("commands", 'required class attribute'): [ { Required("name"): str, Required("cmd"): [str], Optional("kill-signal", default=signal.SIGINT): int } ] } for attr_name, class_client in mcs._class_clients.items(): client_validator = { Required("name"): str, } client_validator.update(class_client.validator()) key = Optional(attr_name, 'required class attribute') valid_attributes[key] = [client_validator] return Schema(valid_attributes, extra=ALLOW_EXTRA)
def valid_adapter_response(base_name, func_name, data): """ Valid that given data match described schema :param str base_name: The name of the asbtract :param str func_name: The name of the called function :parma raw data: The data to valid :raises InvalidFormatError: if data is not compliant """ if not data: return try: Schemas[base_name][func_name](data) except (KeyError, TypeError, ValueError): raise SchemaNotFound('Schema not found for %s.%s' % (base_name, func_name)) except (Invalid, MultipleInvalid) as ex: raise InvalidFormatError('Given data is not compliant to %s.%s schema: %s' % (base_name, func_name, str(ex)))
def validate_body(schema_desc): """ Validate json body """ def real_decorator(func): @wraps(func) def wrapper(*args, **kwargs): try: body = request.get_json() if not Schemas.get(func.__name__): Schemas[func.__name__] = Schema(schema_desc, required=True) Logger.debug(unicode('registering schema for %s' % (func.__name__))) Schemas[func.__name__](body) except (Invalid, MultipleInvalid) as ex: Logger.error(unicode(ex)) msg = 'Missing or invalid field(s) in body, expecting {}'.format(schema_desc) raise BadRequest(msg) return func(*args, **kwargs) return wrapper return real_decorator
def getSchema(self, data): schema = v.Schema({v.Required('servers'): self.servers, 'palettes': self.palettes, 'palette': str, 'keymaps': self.keymaps, 'keymap': str, 'commentlinks': self.commentlinks, 'dashboards': self.dashboards, 'reviewkeys': self.reviewkeys, 'story-list-query': str, 'diff-view': str, 'hide-comments': self.hide_comments, 'display-times-in-utc': bool, 'handle-mouse': bool, 'breadcrumbs': bool, 'story-list-options': self.story_list_options, 'expire-age': str, }) return schema
def __init__(self): self.redisinstance = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0) self.validateschema = Schema({ Required(RegistryKeys.SERVICE_ID): Any(str, unicode), Required(RegistryKeys.SERVICE_NAME): Any(str, unicode), Required(RegistryKeys.SERVICE_PORT): int, Required(RegistryKeys.NAMESPACE, default='default'): Any(str, unicode), Required(RegistryKeys.ENDPOINTS): [{ Required(RegistryKeys.URI): Any(str, unicode), Required(RegistryKeys.ACCEPTANCE_REGEX): Any(str, unicode), RegistryKeys.FILTER_REGEX: { Required(RegistryKeys.PATTERN): Any(str, unicode), Required(RegistryKeys.REPLACE): Any(str, unicode) } }] })
def validate(self): supplier = Schema({ Required('ruc'): All(str, Length(min=11, max=11)), Required('registration_name'): str, Optional('address'): dict, Optional('commercial_name'): str }, extra=ALLOW_EXTRA) customer = Schema({ Required('ruc'): All(str, Length(min=1, max=11)) }, extra=ALLOW_EXTRA) schema = Schema({ Required('issue_date'): str, Required('supplier'): supplier, Required('customer'): customer, Required('voucher_type'): str, Required('currency'): str, Required('voucher_number'): str, Required('lines'): All(list, Length(min=1)) }, extra=ALLOW_EXTRA) schema(self._data)
def __init__(self, type, name, required, options=None): if (len(name) > 63 or name in INVALID_NAMES or not VALID_CHARS.match(name)): raise InvalidResourceAttributeName(name) self.name = name self.required = required self.fill = None # options is set only when we update a resource type if options is not None: fill = options.get("fill") if fill is None and required: raise InvalidResourceAttributeOption( name, "fill", "must not be empty if required=True") elif fill is not None: # Ensure fill have the correct attribute type try: self.fill = voluptuous.Schema(self.schema_ext)(fill) except voluptuous.Error as e: raise InvalidResourceAttributeOption(name, "fill", e)
def __init__(self, *args, **kwargs): super(ResourceTypeSchemaManager, self).__init__(*args, **kwargs) type_schemas = tuple([ext.plugin.meta_schema() for ext in self.extensions]) self._schema = voluptuous.Schema({ "name": six.text_type, voluptuous.Required("attributes", default={}): { six.text_type: voluptuous.Any(*tuple(type_schemas)) } }) type_schemas = tuple([ext.plugin.meta_schema(for_update=True) for ext in self.extensions]) self._schema_for_update = voluptuous.Schema({ "name": six.text_type, voluptuous.Required("attributes", default={}): { six.text_type: voluptuous.Any(*tuple(type_schemas)) } })
def post(self): enforce("create archive policy rule", {}) ArchivePolicyRuleSchema = voluptuous.Schema({ voluptuous.Required("name"): six.text_type, voluptuous.Required("metric_pattern"): six.text_type, voluptuous.Required("archive_policy_name"): six.text_type, }) body = deserialize_and_validate(ArchivePolicyRuleSchema) enforce("create archive policy rule", body) try: ap = pecan.request.indexer.create_archive_policy_rule( body['name'], body['metric_pattern'], body['archive_policy_name'] ) except indexer.ArchivePolicyRuleAlreadyExists as e: abort(409, six.text_type(e)) location = "/archive_policy_rule/" + ap.name set_resp_location_hdr(location) pecan.response.status = 201 return ap
def load_tcconfig(self, config_file_path): import json from voluptuous import Schema, Required, Any, ALLOW_EXTRA schema = Schema({ Required(six.text_type): { Any(*TrafficDirection.LIST): { six.text_type: { six.text_type: Any(six.text_type, int, float) }, } }, }, extra=ALLOW_EXTRA) with open(config_file_path) as fp: self.__config_table = json.load(fp) schema(self.__config_table) self.__logger.debug("tc config file: {:s}".format( json.dumps(self.__config_table, indent=4)))
def get(self): parser = reqparse.RequestParser() return_data = [] parser.add_argument('subvoat_name') args = parser.parse_args() schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))}) try: schema({'subvoat_name':args.get('subvoat_name')}) except MultipleInvalid as e: return {'error':'%s %s' % (e.msg, e.path)} posts = self.subvoat_utils.get_posts(args['subvoat_name']) for p in posts: return_data.append(p) return {'result':return_data}
def page(self, start, end): s_schema = Schema({ Required('start'): All(int, Range(min=0))}) e_schema = Schema({ Required('end'): All(int, Range(min=0))}) s_status, s_result = self.try_schema('start', start, s_schema) if not s_status: return [s_status, s_result] e_status, e_result = self.try_schema('end', end, e_schema) if not e_status: return [e_status, e_result] if end < start: return [False, 'ending page cannot be lower than starting page'] total_pages = end - start if total_pages > 50: return [False, 'you cannot request more than 50 pages'] return [True, None]
def __init__(self, server): self.server = server self.guild_man = server.guild_man self.channel_edit_base = Schema({ 'name': All(str, Length(min=2, max=100)), 'position': int, Optional('nsfw'): bool, }, required=True) self.textchan_editschema = self.channel_edit_base.extend({ 'topic': All(str, Length(min=0, max=1024)) }) self.voicechan_editschema = self.channel_edit_base.extend({ 'bitrate': All(int, Range(min=8000, max=96000)), 'user_limit': All(int, Range(min=0, max=99)), }) self.register()
def test_boolean(): """Test boolean validation.""" schema = vol.Schema(cv.boolean) for value in ('T', 'negative', 'lock'): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ('true', 'On', '1', 'YES', 'enable', 1, True): assert schema(value) for value in ('false', 'Off', '0', 'NO', 'disable', 0, False): assert not schema(value)
def test_latitude(): """Test latitude validation.""" schema = vol.Schema(cv.latitude) for value in ('invalid', None, -91, 91, '-91', '91', '123.01A'): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ('-89', 89, '12.34'): schema(value)
def test_longitude(): """Test longitude validation.""" schema = vol.Schema(cv.longitude) for value in ('invalid', None, -181, 181, '-181', '181', '123.01A'): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ('-179', 179, '12.34'): schema(value)
def test_port(): """Test TCP/UDP network port.""" schema = vol.Schema(cv.port) for value in ('invalid', None, -1, 0, 80000, '81000'): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ('1000', 21, 24574): schema(value)
def test_entity_id(): """Test entity ID validation.""" schema = vol.Schema(cv.entity_id) with pytest.raises(vol.MultipleInvalid): schema('invalid_entity') assert 'sensor.light' == schema('sensor.LIGHT')
def test_icon(): """Test icon validation.""" schema = vol.Schema(cv.icon) for value in (False, 'work', 'icon:work'): with pytest.raises(vol.MultipleInvalid): schema(value) schema('mdi:work')
def test_service(): """Test service validation.""" schema = vol.Schema(cv.service) with pytest.raises(vol.MultipleInvalid): schema('invalid_turn_on') schema('scarlett_os.turn_on')
def test_string(): """Test string validation.""" schema = vol.Schema(cv.string) with pytest.raises(vol.MultipleInvalid): schema(None) for value in (True, 1, 'hello'): schema(value)
def test_temperature_unit(): """Test temperature unit validation.""" schema = vol.Schema(cv.temperature_unit) with pytest.raises(vol.MultipleInvalid): schema('K') schema('C') schema('F')
def test_x10_address(): """Test x10 addr validator.""" schema = vol.Schema(cv.x10_address) with pytest.raises(vol.Invalid): schema('Q1') schema('q55') schema('garbage_addr') schema('a1') schema('C11')
def test_time_zone(): """Test time zone validation.""" schema = vol.Schema(cv.time_zone) with pytest.raises(vol.MultipleInvalid): schema('America/Do_Not_Exist') schema('America/Los_Angeles') schema('UTC')
def test_has_at_least_one_key(): """Test has_at_least_one_key validator.""" schema = vol.Schema(cv.has_at_least_one_key('beer', 'soda')) for value in (None, [], {}, {'wine': None}): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ({'beer': None}, {'soda': None}): schema(value)
def test_ordered_dict_key_validator(): """Test ordered_dict key validator.""" schema = vol.Schema(cv.ordered_dict(cv.match_all, cv.string)) with pytest.raises(vol.Invalid): schema({None: 1}) schema({'hello': 'world'}) schema = vol.Schema(cv.ordered_dict(cv.match_all, int)) with pytest.raises(vol.Invalid): schema({'hello': 1}) schema({1: 'works'})
def test_ordered_dict_value_validator(): """Test ordered_dict validator.""" schema = vol.Schema(cv.ordered_dict(cv.string)) with pytest.raises(vol.Invalid): schema({'hello': None}) schema({'hello': 'world'}) schema = vol.Schema(cv.ordered_dict(int)) with pytest.raises(vol.Invalid): schema({'hello': 'world'}) schema({'hello': 5})
def test_enum(): """Test enum validator.""" class TestEnum(enum.Enum): """Test enum.""" value1 = "Value 1" value2 = "Value 2" schema = vol.Schema(cv.enum(TestEnum)) with pytest.raises(vol.Invalid): schema('value3') TestEnum['value1']
def url(value: Any) -> str: """Validate an URL.""" url_in = str(value) if urlparse(url_in).scheme in ['http', 'https']: return vol.Schema(vol.Url())(url_in) raise vol.Invalid('invalid url')
def _state_variable_create_schema(self, type_info): # construct validators validators = [] data_type = type_info['data_type_python'] validators.append(data_type) if 'allowed_values' in type_info: allowed_values = type_info['allowed_values'] in_ = vol.In(allowed_values) # coerce allowed values? assume always string for now validators.append(in_) if 'allowed_value_range' in type_info: min_ = type_info['allowed_value_range'].get('min', None) max_ = type_info['allowed_value_range'].get('max', None) min_ = data_type(min_) max_ = data_type(max_) range_ = vol.Range(min=min_, max=max_) validators.append(range_) # construct key key = vol.Required('value') if 'default_value' in type_info: default_value = type_info['default_value'] if data_type == bool: default_value = default_value == '1' else: default_value = data_type(default_value) key.default = default_value return vol.Schema({key: vol.All(*validators)})
def test_can_set_default_manifest_file(self): with tempfile.TemporaryDirectory() as d: with (Path(d) / 'default.pp').open('w') as f: f.write('dummy pp file') config = {'manifests_path': d} try: final_config = Schema(PuppetProvisioner.schema)(config) except Invalid as e: pytest.fail("schema validation didn't pass: {}".format(e)) assert final_config == { 'manifest_file': 'default.pp', 'manifests_path': d}
def test_can_set_default_environment(self): with tempfile.TemporaryDirectory() as d: os.mkdir(str(Path(d) / 'production')) config = {'environment_path': d} try: final_config = Schema(PuppetProvisioner.schema)(config) except Invalid as e: pytest.fail("schema validation didn't pass: {}".format(e)) assert final_config == { 'environment': 'production', 'environment_path': d}
def test_can_set_default_mode_and_manifest_file(self, mock_isfile, mock_isdir): config = {} try: final_config = Schema(PuppetProvisioner.schema)(config) except Invalid as e: pytest.fail("schema validation didn't pass: {}".format(e)) assert final_config == { 'manifests_path': 'manifests', 'manifest_file': 'default.pp'}
def action_model(value): """Validates the data against action_model schema.""" action_model_schema = voluptuous.Schema({ "action": voluptuous.And(basestring, lambda o: not o.startswith("_")) }, required=True) return action_model_schema(value)
def banana_model(value): """Validates the data against the banana_model schema.""" banana_model_schema = voluptuous.Schema({ "content": basestring }, required=True) return banana_model_schema(value)
def validate_config(_config): lingam_schema = voluptuous.Schema({ "module": voluptuous.And(basestring, vu.NoSpaceCharacter()), "threshold": float }, required=True) return lingam_schema(_config)
def validate_config(_config): svm_schema = voluptuous.Schema({ "module": voluptuous.And(basestring, vu.NoSpaceCharacter()), "nb_samples": voluptuous.Or(float, int) }, required=True) return svm_schema(_config)
def validate_config(_config): log_reg_schema = voluptuous.Schema({ 'module': voluptuous.And( basestring, NoSpaceCharacter()), 'nb_samples': voluptuous.Or(float, int) }, required=True) return log_reg_schema(_config)
def validate_config(_config): svc_schema = voluptuous.Schema({ 'module': voluptuous.And( basestring, NoSpaceCharacter()), 'nb_samples': voluptuous.Or(float, int) }, required=True) return svc_schema(_config)
def validate_config(_config): elliptic_schema = voluptuous.Schema({ 'module': voluptuous.And( basestring, NoSpaceCharacter()), 'nb_samples': voluptuous.Or(float, int) }, required=True) return elliptic_schema(_config)
def validate_config(_config): decisiontree_schema = voluptuous.Schema({ 'module': voluptuous.And( basestring, NoSpaceCharacter()), 'nb_samples': voluptuous.Or(float, int) }, required=True) return decisiontree_schema(_config)
def validate_config(_config): randomforest_schema = voluptuous.Schema({ 'module': voluptuous.And( basestring, NoSpaceCharacter()), 'nb_samples': voluptuous.Or(float, int) }, required=True) return randomforest_schema(_config)