我们从Python开源项目中,提取了以下29个代码示例,用于说明如何使用voluptuous.All()。
def validate(self): supplier = Schema({ Required('ruc'): All(str, Length(min=11, max=11)), Required('registration_name'): str, Optional('address'): dict, Optional('commercial_name'): str }, extra=ALLOW_EXTRA) customer = Schema({ Required('ruc'): All(str, Length(min=1, max=11)) }, extra=ALLOW_EXTRA) schema = Schema({ Required('issue_date'): str, Required('supplier'): supplier, Required('customer'): customer, Required('voucher_type'): str, Required('currency'): str, Required('voucher_number'): str, Required('lines'): All(list, Length(min=1)) }, extra=ALLOW_EXTRA) schema(self._data)
def get_pagination_options(params, default): try: opts = voluptuous.Schema({ voluptuous.Required( "limit", default=pecan.request.conf.api.max_limit): voluptuous.All(voluptuous.Coerce(int), voluptuous.Range(min=1), voluptuous.Clamp( min=1, max=pecan.request.conf.api.max_limit)), "marker": six.text_type, voluptuous.Required("sort", default=default): voluptuous.All( voluptuous.Coerce(arg_to_list), [six.text_type]), }, extra=voluptuous.REMOVE_EXTRA)(params) except voluptuous.Invalid as e: abort(400, {"cause": "Argument value error", "reason": str(e)}) opts['sorts'] = opts['sort'] del opts['sort'] return opts
def get(self): parser = reqparse.RequestParser() return_data = [] parser.add_argument('subvoat_name') args = parser.parse_args() schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))}) try: schema({'subvoat_name':args.get('subvoat_name')}) except MultipleInvalid as e: return {'error':'%s %s' % (e.msg, e.path)} posts = self.subvoat_utils.get_posts(args['subvoat_name']) for p in posts: return_data.append(p) return {'result':return_data}
def page(self, start, end): s_schema = Schema({ Required('start'): All(int, Range(min=0))}) e_schema = Schema({ Required('end'): All(int, Range(min=0))}) s_status, s_result = self.try_schema('start', start, s_schema) if not s_status: return [s_status, s_result] e_status, e_result = self.try_schema('end', end, e_schema) if not e_status: return [e_status, e_result] if end < start: return [False, 'ending page cannot be lower than starting page'] total_pages = end - start if total_pages > 50: return [False, 'you cannot request more than 50 pages'] return [True, None]
def __init__(self, server): self.server = server self.guild_man = server.guild_man self.channel_edit_base = Schema({ 'name': All(str, Length(min=2, max=100)), 'position': int, Optional('nsfw'): bool, }, required=True) self.textchan_editschema = self.channel_edit_base.extend({ 'topic': All(str, Length(min=0, max=1024)) }) self.voicechan_editschema = self.channel_edit_base.extend({ 'bitrate': All(int, Range(min=8000, max=96000)), 'user_limit': All(int, Range(min=0, max=99)), }) self.register()
def enum(enumClass): """Create validator for specified enum.""" return vol.All(vol.In(enumClass.__members__), enumClass.__getitem__)
def state_variables(self): """Get All UpnpStateVariables for this UpnpService.""" return self._state_variables
def actions(self): """Get All UpnpActions for this UpnpService.""" return self._actions
def _state_variable_create_schema(self, type_info): # construct validators validators = [] data_type = type_info['data_type_python'] validators.append(data_type) if 'allowed_values' in type_info: allowed_values = type_info['allowed_values'] in_ = vol.In(allowed_values) # coerce allowed values? assume always string for now validators.append(in_) if 'allowed_value_range' in type_info: min_ = type_info['allowed_value_range'].get('min', None) max_ = type_info['allowed_value_range'].get('max', None) min_ = data_type(min_) max_ = data_type(max_) range_ = vol.Range(min=min_, max=max_) validators.append(range_) # construct key key = vol.Required('value') if 'default_value' in type_info: default_value = type_info['default_value'] if data_type == bool: default_value = default_value == '1' else: default_value = data_type(default_value) key.default = default_value return vol.Schema({key: vol.All(*validators)})
def _add_validation(context): validation_utils = dbt.utils.AttrDict({ 'any': voluptuous.Any, 'all': voluptuous.All, }) return dbt.utils.merge( context, {'validation': validation_utils})
def schema_ext(self): return voluptuous.All(six.text_type, voluptuous.Length( min=self.min_length, max=self.max_length))
def schema_ext(self): return voluptuous.All(numbers.Real, voluptuous.Range(min=self.min, max=self.max))
def MetricSchema(v): """metric keyword schema It could be: ["metric", "metric-ref", "aggregation"] or ["metric, ["metric-ref", "aggregation"], ["metric-ref", "aggregation"]] """ if not isinstance(v, (list, tuple)): raise voluptuous.Invalid("Expected a tuple/list, got a %s" % type(v)) elif not v: raise voluptuous.Invalid("Operation must not be empty") elif len(v) < 2: raise voluptuous.Invalid("Operation need at least one argument") elif v[0] != u"metric": # NOTE(sileht): this error message doesn't looks related to "metric", # but because that the last schema validated by voluptuous, we have # good chance (voluptuous.Any is not predictable) to print this # message even if it's an other operation that invalid. raise voluptuous.Invalid("'%s' operation invalid" % v[0]) return [u"metric"] + voluptuous.Schema(voluptuous.Any( voluptuous.ExactSequence([six.text_type, six.text_type]), voluptuous.All( voluptuous.Length(min=1), [voluptuous.ExactSequence([six.text_type, six.text_type])], )), required=True)(v[1:])
def patch(self): ap = pecan.request.indexer.get_archive_policy(self.archive_policy) if not ap: abort(404, six.text_type( indexer.NoSuchArchivePolicy(self.archive_policy))) enforce("update archive policy", ap) body = deserialize_and_validate(voluptuous.Schema({ voluptuous.Required("definition"): voluptuous.All([{ "granularity": Timespan, "points": PositiveNotNullInt, "timespan": Timespan}], voluptuous.Length(min=1)), })) # Validate the data try: ap_items = [archive_policy.ArchivePolicyItem(**item) for item in body['definition']] except ValueError as e: abort(400, six.text_type(e)) try: return pecan.request.indexer.update_archive_policy( self.archive_policy, ap_items) except indexer.UnsupportedArchivePolicyChange as e: abort(400, six.text_type(e))
def subvoat_name(self, subvoat_name): schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))}) return self.try_schema('subvoat_name', subvoat_name, schema)
def uuid(self, uuid): schema = Schema({ Required('uuid'): All(str, Length(min=36, max=36))}) return self.try_schema('uuid', uuid, schema)
def comment_body(self, comment_body): schema = Schema({ Required('comment_body'): All(str, Length(min=self.config['min_length_comment_body'], max=self.config['max_length_comment_body']))}) return self.try_schema('comment_body', comment_body, schema)
def thread(self, subvoat_name, title, body): title_schema = Schema({ Required('title'): All(str, Length(min=self.config['min_length_thread_title'], max=self.config['max_length_thread_title']))}) body_schema = Schema({ Required('body'): All(str, Length(min=self.config['min_length_thread_body'], max=self.config['max_length_thread_body']))}) # validate the subvoat_name first sn_status, sn_result = self.subvoat_name(subvoat_name) if not sn_status: return [sn_status, sn_result] # Then validate the thread title t_status, t_result = self.try_schema('title', title, title_schema) if not t_status: return [t_status, t_result] # and thread body b_status, b_result = self.try_schema('body', body, body_schema) if not b_status: return [b_status, b_result] # return True if everything is ok return [True, None]
def username(self, username): schema = Schema({ Required('username'): All(str, Length(min=self.config['min_length_username'], max=self.config['max_length_username']))}) return self.try_schema('username', username, schema)
def password(self, password): schema = Schema({ Required('password'): All(str, Length(min=self.config['min_length_password'], max=self.config['max_length_password']))}) return self.try_schema('password', password, schema)
def api_token(self, api_token): # FIX: make this actually check the token type schema = Schema({ Required('api_token'): All(str, Length(min=36, max=36))}) return self.try_schema('api_token', api_token, schema)
def get(self, request): """Handle a GET request.""" hass = request.app['hass'] data = request.GET if 'code' not in data or 'state' not in data: return self.json_message('Authentication failed, not the right ' 'variables, try again.', HTTP_BAD_REQUEST) self.lyric.authorization_code(code=data['code'], state=data['state']) return self.json_message('OK! All good. Got the respons! You can close' 'this window now, and click << Continue >>' 'in the configurator.')
def __init__(self, server): self.server = server self.guild_man = server.guild_man _o = Optional self.guild_edit_schema = Schema({ _o('name'): str, _o('region'): str, _o('verification_level'): int, _o('default_message_notifications'): int, _o('afk_channel_id'): str, _o('afk_timeout'): int, _o('icon'): str, _o('owner_id'): str, }, required=True, extra=REMOVE_EXTRA) self.guild_create_schema = Schema({ 'name': str, 'region': str, 'icon': Any(None, str), 'verification_level': int, 'default_message_notifications': int, }, extra=REMOVE_EXTRA) self.channel_create_schema = Schema({ 'name': All(str, Length(min=2, max=100)), _o('type'): int, _o('bitrate'): int, _o('user_limit'): int, _o('permission_overwrites'): list, }, required=True, extra=REMOVE_EXTRA) self.register()
def __init__(self, server): self.server = server self.guild_man = server.guild_man self.invite_create_schema = Schema({ Required('max_age', default=86400): All(int, Range(min=10, max=86400)), Required('max_uses', default=0): All(int, Range(min=0, max=50)), Required('temporary', default=False): bool, Required('unique', default=True): bool, }, extra=REMOVE_EXTRA) self.register(server.app)
def async_load_config(path: str, hass: HomeAssistantType, consider_home: timedelta): """Load devices from YAML configuration file. This method is a coroutine. """ dev_schema = vol.Schema({ vol.Required('name'): cv.string, vol.Optional('track', default=False): cv.boolean, vol.Optional('mac', default=None): vol.Any(None, vol.All(cv.string, vol.Upper)), vol.Optional(CONF_AWAY_HIDE, default=DEFAULT_AWAY_HIDE): cv.boolean, vol.Optional('gravatar', default=None): vol.Any(None, cv.string), vol.Optional('picture', default=None): vol.Any(None, cv.string), vol.Optional(CONF_CONSIDER_HOME, default=consider_home): vol.All( cv.time_period, cv.positive_timedelta), vol.Optional('vendor', default=None): vol.Any(None, cv.string), }) try: result = [] try: devices = yield from hass.loop.run_in_executor( None, load_yaml_config_file, path) except HomeAssistantError as err: _LOGGER.error('Unable to load %s: %s', path, str(err)) return [] for dev_id, device in devices.items(): try: device = dev_schema(device) device['dev_id'] = cv.slugify(dev_id) except vol.Invalid as exp: async_log_exception(exp, dev_id, devices, hass) else: result.append(Device(hass, **device)) return result except (HomeAssistantError, FileNotFoundError): # When YAML file could not be loaded/did not contain a dict return []
def MetricSchema(definition): creator = pecan.request.auth_helper.get_current_user( pecan.request) # First basic validation schema = voluptuous.Schema({ "archive_policy_name": six.text_type, "resource_id": functools.partial(ResourceID, creator=creator), "name": six.text_type, voluptuous.Optional("unit"): voluptuous.All(six.text_type, voluptuous.Length(max=31)), }) definition = schema(definition) archive_policy_name = definition.get('archive_policy_name') name = definition.get('name') if name and '/' in name: abort(400, "'/' is not supported in metric name") if archive_policy_name is None: try: ap = pecan.request.indexer.get_archive_policy_for_metric(name) except indexer.NoArchivePolicyRuleMatch: # NOTE(jd) Since this is a schema-like function, we # should/could raise ValueError, but if we do so, voluptuous # just returns a "invalid value" with no useful message – so we # prefer to use abort() to make sure the user has the right # error message abort(400, "No archive policy name specified " "and no archive policy rule found matching " "the metric name %s" % name) else: definition['archive_policy_name'] = ap.name resource_id = definition.get('resource_id') if resource_id is None: original_resource_id = None else: if name is None: abort(400, {"cause": "Attribute value error", "detail": "name", "reason": "Name cannot be null " "if resource_id is not null"}) original_resource_id, resource_id = resource_id enforce("create metric", { "creator": creator, "archive_policy_name": archive_policy_name, "resource_id": resource_id, "original_resource_id": original_resource_id, "name": name, "unit": definition.get('unit'), }) return definition
def _ResourceSearchSchema(): user = pecan.request.auth_helper.get_current_user( pecan.request) _ResourceUUID = functools.partial(ResourceUUID, creator=user) return voluptuous.Schema( voluptuous.All( voluptuous.Length(min=0, max=1), { voluptuous.Any( u"=", u"==", u"eq", u"<", u"lt", u">", u"gt", u"<=", u"?", u"le", u">=", u"?", u"ge", u"!=", u"?", u"ne", ): voluptuous.All( voluptuous.Length(min=1, max=1), {"id": _ResourceUUID, NotIDKey: ResourceSearchSchemaAttributeValue}, ), u"like": voluptuous.All( voluptuous.Length(min=1, max=1), {NotIDKey: ResourceSearchSchemaAttributeValue}, ), u"in": voluptuous.All( voluptuous.Length(min=1, max=1), {"id": voluptuous.All( [_ResourceUUID], voluptuous.Length(min=1)), NotIDKey: voluptuous.All( [ResourceSearchSchemaAttributeValue], voluptuous.Length(min=1))} ), voluptuous.Any( u"and", u"?", u"or", u"?", ): voluptuous.All( [ResourceSearchSchema], voluptuous.Length(min=1) ), u"not": ResourceSearchSchema, } ) )
def async_run(self, variables: Optional[Sequence]=None) -> None: """Run script. This method is a coroutine. """ if self._cur == -1: self._log('Running script') self._cur = 0 # Unregister callback if we were in a delay but turn on is called # again. In that case we just continue execution. self._async_remove_listener() for cur, action in islice(enumerate(self.sequence), self._cur, None): if CONF_DELAY in action: # Call ourselves in the future to continue work @asyncio.coroutine def script_delay(now): """Called after delay is done.""" self._async_unsub_delay_listener = None self.hass.async_add_job(self.async_run(variables)) delay = action[CONF_DELAY] if isinstance(delay, template.Template): delay = vol.All( cv.time_period, cv.positive_timedelta)( delay.async_render(variables)) self._async_unsub_delay_listener = \ async_track_point_in_utc_time( self.hass, script_delay, date_util.utcnow() + delay) self._cur = cur + 1 if self._change_listener: self.hass.async_add_job(self._change_listener) return elif CONF_CONDITION in action: if not self._async_check_condition(action, variables): break elif CONF_EVENT in action: self._async_fire_event(action) else: yield from self._async_call_service(action, variables) self._cur = -1 self.last_action = None if self._change_listener: self.hass.async_add_job(self._change_listener)