我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用voluptuous.Required()。
def test_socket_timeout(): """Test socket timeout validator.""" TEST_CONF_TIMEOUT = 'timeout' schema = vol.Schema( {vol.Required(TEST_CONF_TIMEOUT, default=None): cv.socket_timeout}) with pytest.raises(vol.Invalid): schema({TEST_CONF_TIMEOUT: 0.0}) with pytest.raises(vol.Invalid): schema({TEST_CONF_TIMEOUT: -1}) assert _GLOBAL_DEFAULT_TIMEOUT == schema({TEST_CONF_TIMEOUT: None})[TEST_CONF_TIMEOUT] assert 1.0 == schema({TEST_CONF_TIMEOUT: 1})[TEST_CONF_TIMEOUT]
def setUp(self): spy = unittest.mock.Mock() class MyClient(Client): attr_name = 'my_clients' @staticmethod def validator(): return { voluptuous.Required('foo'): int, voluptuous.Optional('bar', default='def bar'): str } def __init__(self, **kwargs): spy(kwargs) def close(self): spy('close') self.spy = spy MetaServerTestState.bind_class_client(MyClient) self.addCleanup(lambda: MetaServerTestState.forget_client(MyClient))
def getSchema(self, data): schema = v.Schema({v.Required('servers'): self.servers, 'palettes': self.palettes, 'palette': str, 'keymaps': self.keymaps, 'keymap': str, 'commentlinks': self.commentlinks, 'dashboards': self.dashboards, 'reviewkeys': self.reviewkeys, 'story-list-query': str, 'diff-view': str, 'hide-comments': self.hide_comments, 'display-times-in-utc': bool, 'handle-mouse': bool, 'breadcrumbs': bool, 'story-list-options': self.story_list_options, 'expire-age': str, }) return schema
def __init__(self, *args, **kwargs): super(ResourceTypeSchemaManager, self).__init__(*args, **kwargs) type_schemas = tuple([ext.plugin.meta_schema() for ext in self.extensions]) self._schema = voluptuous.Schema({ "name": six.text_type, voluptuous.Required("attributes", default={}): { six.text_type: voluptuous.Any(*tuple(type_schemas)) } }) type_schemas = tuple([ext.plugin.meta_schema(for_update=True) for ext in self.extensions]) self._schema_for_update = voluptuous.Schema({ "name": six.text_type, voluptuous.Required("attributes", default={}): { six.text_type: voluptuous.Any(*tuple(type_schemas)) } })
def get_pagination_options(params, default): try: opts = voluptuous.Schema({ voluptuous.Required( "limit", default=pecan.request.conf.api.max_limit): voluptuous.All(voluptuous.Coerce(int), voluptuous.Range(min=1), voluptuous.Clamp( min=1, max=pecan.request.conf.api.max_limit)), "marker": six.text_type, voluptuous.Required("sort", default=default): voluptuous.All( voluptuous.Coerce(arg_to_list), [six.text_type]), }, extra=voluptuous.REMOVE_EXTRA)(params) except voluptuous.Invalid as e: abort(400, {"cause": "Argument value error", "reason": str(e)}) opts['sorts'] = opts['sort'] del opts['sort'] return opts
def get(self): parser = reqparse.RequestParser() return_data = [] parser.add_argument('subvoat_name') args = parser.parse_args() schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))}) try: schema({'subvoat_name':args.get('subvoat_name')}) except MultipleInvalid as e: return {'error':'%s %s' % (e.msg, e.path)} posts = self.subvoat_utils.get_posts(args['subvoat_name']) for p in posts: return_data.append(p) return {'result':return_data}
def page(self, start, end): s_schema = Schema({ Required('start'): All(int, Range(min=0))}) e_schema = Schema({ Required('end'): All(int, Range(min=0))}) s_status, s_result = self.try_schema('start', start, s_schema) if not s_status: return [s_status, s_result] e_status, e_result = self.try_schema('end', end, e_schema) if not e_status: return [e_status, e_result] if end < start: return [False, 'ending page cannot be lower than starting page'] total_pages = end - start if total_pages > 50: return [False, 'you cannot request more than 50 pages'] return [True, None]
def _state_variable_create_schema(self, type_info): # construct validators validators = [] data_type = type_info['data_type_python'] validators.append(data_type) if 'allowed_values' in type_info: allowed_values = type_info['allowed_values'] in_ = vol.In(allowed_values) # coerce allowed values? assume always string for now validators.append(in_) if 'allowed_value_range' in type_info: min_ = type_info['allowed_value_range'].get('min', None) max_ = type_info['allowed_value_range'].get('max', None) min_ = data_type(min_) max_ = data_type(max_) range_ = vol.Range(min=min_, max=max_) validators.append(range_) # construct key key = vol.Required('value') if 'default_value' in type_info: default_value = type_info['default_value'] if data_type == bool: default_value = default_value == '1' else: default_value = data_type(default_value) key.default = default_value return vol.Schema({key: vol.All(*validators)})
def meta_schema(cls, for_update=False): d = { voluptuous.Required('type'): cls.typename, voluptuous.Required('required', default=True): bool } if for_update: d[voluptuous.Required('options', default={})] = OperationOptions if callable(cls.meta_schema_ext): d.update(cls.meta_schema_ext()) else: d.update(cls.meta_schema_ext) return d
def patch(self): ap = pecan.request.indexer.get_archive_policy(self.archive_policy) if not ap: abort(404, six.text_type( indexer.NoSuchArchivePolicy(self.archive_policy))) enforce("update archive policy", ap) body = deserialize_and_validate(voluptuous.Schema({ voluptuous.Required("definition"): voluptuous.All([{ "granularity": Timespan, "points": PositiveNotNullInt, "timespan": Timespan}], voluptuous.Length(min=1)), })) # Validate the data try: ap_items = [archive_policy.ArchivePolicyItem(**item) for item in body['definition']] except ValueError as e: abort(400, six.text_type(e)) try: return pecan.request.indexer.update_archive_policy( self.archive_policy, ap_items) except indexer.UnsupportedArchivePolicyChange as e: abort(400, six.text_type(e))
def post(self): enforce("create archive policy", {}) # NOTE(jd): Initialize this one at run-time because we rely on conf conf = pecan.request.conf valid_agg_methods = ( archive_policy.ArchivePolicy.VALID_AGGREGATION_METHODS_VALUES ) ArchivePolicySchema = voluptuous.Schema({ voluptuous.Required("name"): six.text_type, voluptuous.Required("back_window", default=0): PositiveOrNullInt, voluptuous.Required( "aggregation_methods", default=set(conf.archive_policy.default_aggregation_methods)): voluptuous.All(list(valid_agg_methods), voluptuous.Coerce(set)), voluptuous.Required("definition"): voluptuous.All([{ "granularity": Timespan, "points": PositiveNotNullInt, "timespan": Timespan, }], voluptuous.Length(min=1)), }) body = deserialize_and_validate(ArchivePolicySchema) # Validate the data try: ap = archive_policy.ArchivePolicy.from_dict(body) except ValueError as e: abort(400, six.text_type(e)) enforce("create archive policy", ap) try: ap = pecan.request.indexer.create_archive_policy(ap) except indexer.ArchivePolicyAlreadyExists as e: abort(409, six.text_type(e)) location = "/archive_policy/" + ap.name set_resp_location_hdr(location) pecan.response.status = 201 return ap
def patch(self): ArchivePolicyRuleSchema = voluptuous.Schema({ voluptuous.Required("name"): six.text_type, }) body = deserialize_and_validate(ArchivePolicyRuleSchema) enforce("update archive policy rule", {}) try: return pecan.request.indexer.update_archive_policy_rule( self.archive_policy_rule.name, body["name"]) except indexer.UnsupportedArchivePolicyRuleChange as e: abort(400, six.text_type(e))
def parse_option(option_string): if 'OPTION_NO_VALUE' in option_string: option = re.findall(r'\"(.*?)\"', option_string)[0] # The options without values seem to still need a value # when used with pilight-daemon, but this are not mandatory # options # E.G.: option 'on' is 'on': 1 return {vol.Optional(option): vol.Coerce(int)} elif 'OPTION_HAS_VALUE' in option_string: options = re.findall(r'\"(.*?)\"', option_string) option = options[0] regex = None if len(options) > 1: # Option has specified value by regex regex = options[1] if 'JSON_NUMBER' in option_string: return {vol.Required(option): vol.Coerce(int)} elif 'JSON_STRING' in option_string: return {vol.Required(option): vol.Coerce(str)} else: raise elif 'OPTION_OPT_VALUE' in option_string: options = re.findall(r'\"(.*?)\"', option_string) option = options[0] regex = None if len(options) > 1: # Option has specified value by regex regex = options[1] if 'JSON_NUMBER' in option_string: return {vol.Required(option): vol.Coerce(int)} elif 'JSON_STRING' in option_string: return {vol.Required(option): vol.Coerce(str)} else: raise else: print(option_string) raise raise
def test_protectron_kwargs_input_schema_with_default(): @protectron(Schema({Required('a', default=42): int, 'b': int})) def foo(a, b): return a assert foo(b=42) == 42
def subvoat_name(self, subvoat_name): schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))}) return self.try_schema('subvoat_name', subvoat_name, schema)
def uuid(self, uuid): schema = Schema({ Required('uuid'): All(str, Length(min=36, max=36))}) return self.try_schema('uuid', uuid, schema)
def comment_body(self, comment_body): schema = Schema({ Required('comment_body'): All(str, Length(min=self.config['min_length_comment_body'], max=self.config['max_length_comment_body']))}) return self.try_schema('comment_body', comment_body, schema)
def thread(self, subvoat_name, title, body): title_schema = Schema({ Required('title'): All(str, Length(min=self.config['min_length_thread_title'], max=self.config['max_length_thread_title']))}) body_schema = Schema({ Required('body'): All(str, Length(min=self.config['min_length_thread_body'], max=self.config['max_length_thread_body']))}) # validate the subvoat_name first sn_status, sn_result = self.subvoat_name(subvoat_name) if not sn_status: return [sn_status, sn_result] # Then validate the thread title t_status, t_result = self.try_schema('title', title, title_schema) if not t_status: return [t_status, t_result] # and thread body b_status, b_result = self.try_schema('body', body, body_schema) if not b_status: return [b_status, b_result] # return True if everything is ok return [True, None]
def username(self, username): schema = Schema({ Required('username'): All(str, Length(min=self.config['min_length_username'], max=self.config['max_length_username']))}) return self.try_schema('username', username, schema)
def password(self, password): schema = Schema({ Required('password'): All(str, Length(min=self.config['min_length_password'], max=self.config['max_length_password']))}) return self.try_schema('password', password, schema)
def api_token(self, api_token): # FIX: make this actually check the token type schema = Schema({ Required('api_token'): All(str, Length(min=36, max=36))}) return self.try_schema('api_token', api_token, schema)
def __init__(self): self.schema = voluptuous.Schema({ 'name': six.text_type, 'flavor': six.text_type, 'os': { voluptuous.Required('distro'): six.text_type, voluptuous.Required('version'): six.text_type, voluptuous.Required('os_type'): six.text_type, }, 'metadata': dict, 'start_date': voluptuous.Datetime(), 'end_date': voluptuous.Datetime(), })
def async_load_config(path: str, hass: HomeAssistantType, consider_home: timedelta): """Load devices from YAML configuration file. This method is a coroutine. """ dev_schema = vol.Schema({ vol.Required('name'): cv.string, vol.Optional('track', default=False): cv.boolean, vol.Optional('mac', default=None): vol.Any(None, vol.All(cv.string, vol.Upper)), vol.Optional(CONF_AWAY_HIDE, default=DEFAULT_AWAY_HIDE): cv.boolean, vol.Optional('gravatar', default=None): vol.Any(None, cv.string), vol.Optional('picture', default=None): vol.Any(None, cv.string), vol.Optional(CONF_CONSIDER_HOME, default=consider_home): vol.All( cv.time_period, cv.positive_timedelta), vol.Optional('vendor', default=None): vol.Any(None, cv.string), }) try: result = [] try: devices = yield from hass.loop.run_in_executor( None, load_yaml_config_file, path) except HomeAssistantError as err: _LOGGER.error('Unable to load %s: %s', path, str(err)) return [] for dev_id, device in devices.items(): try: device = dev_schema(device) device['dev_id'] = cv.slugify(dev_id) except vol.Invalid as exp: async_log_exception(exp, dev_id, devices, hass) else: result.append(Device(hass, **device)) return result except (HomeAssistantError, FileNotFoundError): # When YAML file could not be loaded/did not contain a dict return []