我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用voluptuous.Any()。
def __init__(self): self.redisinstance = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=0) self.validateschema = Schema({ Required(RegistryKeys.SERVICE_ID): Any(str, unicode), Required(RegistryKeys.SERVICE_NAME): Any(str, unicode), Required(RegistryKeys.SERVICE_PORT): int, Required(RegistryKeys.NAMESPACE, default='default'): Any(str, unicode), Required(RegistryKeys.ENDPOINTS): [{ Required(RegistryKeys.URI): Any(str, unicode), Required(RegistryKeys.ACCEPTANCE_REGEX): Any(str, unicode), RegistryKeys.FILTER_REGEX: { Required(RegistryKeys.PATTERN): Any(str, unicode), Required(RegistryKeys.REPLACE): Any(str, unicode) } }] })
def __init__(self, *args, **kwargs): super(ResourceTypeSchemaManager, self).__init__(*args, **kwargs) type_schemas = tuple([ext.plugin.meta_schema() for ext in self.extensions]) self._schema = voluptuous.Schema({ "name": six.text_type, voluptuous.Required("attributes", default={}): { six.text_type: voluptuous.Any(*tuple(type_schemas)) } }) type_schemas = tuple([ext.plugin.meta_schema(for_update=True) for ext in self.extensions]) self._schema_for_update = voluptuous.Schema({ "name": six.text_type, voluptuous.Required("attributes", default={}): { six.text_type: voluptuous.Any(*tuple(type_schemas)) } })
def load_tcconfig(self, config_file_path): import json from voluptuous import Schema, Required, Any, ALLOW_EXTRA schema = Schema({ Required(six.text_type): { Any(*TrafficDirection.LIST): { six.text_type: { six.text_type: Any(six.text_type, int, float) }, } }, }, extra=ALLOW_EXTRA) with open(config_file_path) as fp: self.__config_table = json.load(fp) schema(self.__config_table) self.__logger.debug("tc config file: {:s}".format( json.dumps(self.__config_table, indent=4)))
def boolean(value: Any) -> bool: """Validate and coerce a boolean value.""" if isinstance(value, str): value = value.lower() if value in ('1', 'true', 'yes', 'on', 'enable'): return True if value in ('0', 'false', 'no', 'off', 'disable'): return False raise vol.Invalid('invalid boolean value {}'.format(value)) return bool(value)
def isfile(value: Any) -> str: """Validate that the value is an existing file.""" if value is None: raise vol.Invalid('None is not file') file_in = os.path.expanduser(str(value)) if not os.path.isfile(file_in): raise vol.Invalid('not a file') if not os.access(file_in, os.R_OK): raise vol.Invalid('file not readable') return file_in
def entity_id(value: Any) -> str: """Validate Entity ID.""" value = string(value).lower() if valid_entity_id(value): return value raise vol.Invalid('Entity ID {} is an invalid entity id'.format(value))
def string(value: Any) -> str: """Coerce value to string, except for None.""" if value is not None: return str(value) raise vol.Invalid('string value is None')
def url(value: Any) -> str: """Validate an URL.""" url_in = str(value) if urlparse(url_in).scheme in ['http', 'https']: return vol.Schema(vol.Url())(url_in) raise vol.Invalid('invalid url')
def _add_validation(context): validation_utils = dbt.utils.AttrDict({ 'any': voluptuous.Any, 'all': voluptuous.All, }) return dbt.utils.merge( context, {'validation': validation_utils})
def MetricSchema(v): """metric keyword schema It could be: ["metric", "metric-ref", "aggregation"] or ["metric, ["metric-ref", "aggregation"], ["metric-ref", "aggregation"]] """ if not isinstance(v, (list, tuple)): raise voluptuous.Invalid("Expected a tuple/list, got a %s" % type(v)) elif not v: raise voluptuous.Invalid("Operation must not be empty") elif len(v) < 2: raise voluptuous.Invalid("Operation need at least one argument") elif v[0] != u"metric": # NOTE(sileht): this error message doesn't looks related to "metric", # but because that the last schema validated by voluptuous, we have # good chance (voluptuous.Any is not predictable) to print this # message even if it's an other operation that invalid. raise voluptuous.Invalid("'%s' operation invalid" % v[0]) return [u"metric"] + voluptuous.Schema(voluptuous.Any( voluptuous.ExactSequence([six.text_type, six.text_type]), voluptuous.All( voluptuous.Length(min=1), [voluptuous.ExactSequence([six.text_type, six.text_type])], )), required=True)(v[1:])
def OperationsSchema(v): if isinstance(v, six.text_type): try: v = pyparsing.OneOrMore( pyparsing.nestedExpr()).parseString(v).asList()[0] except pyparsing.ParseException as e: api.abort(400, {"cause": "Invalid operations", "reason": "Fail to parse the operations string", "detail": six.text_type(e)}) return voluptuous.Schema(voluptuous.Any(*OperationsSchemaBase), required=True)(v)
def ResourceSchema(schema): base_schema = { voluptuous.Optional('started_at'): utils.to_datetime, voluptuous.Optional('ended_at'): utils.to_datetime, voluptuous.Optional('user_id'): voluptuous.Any(None, six.text_type), voluptuous.Optional('project_id'): voluptuous.Any(None, six.text_type), voluptuous.Optional('metrics'): MetricsSchema, } base_schema.update(schema) return base_schema
def __init__(self, server): self.server = server self.guild_man = server.guild_man _o = Optional self.guild_edit_schema = Schema({ _o('name'): str, _o('region'): str, _o('verification_level'): int, _o('default_message_notifications'): int, _o('afk_channel_id'): str, _o('afk_timeout'): int, _o('icon'): str, _o('owner_id'): str, }, required=True, extra=REMOVE_EXTRA) self.guild_create_schema = Schema({ 'name': str, 'region': str, 'icon': Any(None, str), 'verification_level': int, 'default_message_notifications': int, }, extra=REMOVE_EXTRA) self.channel_create_schema = Schema({ 'name': All(str, Length(min=2, max=100)), _o('type'): int, _o('bitrate'): int, _o('user_limit'): int, _o('permission_overwrites'): list, }, required=True, extra=REMOVE_EXTRA) self.register()
def async_setup_scanner_platform(hass: HomeAssistantType, config: ConfigType, scanner: Any, async_see_device: Callable): """Helper method to connect scanner-based platform to device tracker. This method is a coroutine. """ interval = config.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL) # Initial scan of each mac we also tell about host name for config seen = set() # type: Any def device_tracker_scan(now: dt_util.dt.datetime): """Called when interval matches.""" found_devices = scanner.scan_devices() for mac in found_devices: if mac in seen: host_name = None else: host_name = scanner.get_device_name(mac) seen.add(mac) hass.add_job(async_see_device(mac=mac, host_name=host_name)) async_track_utc_time_change( hass, device_tracker_scan, second=range(0, 60, interval)) hass.async_add_job(device_tracker_scan, None)
def _ResourceSearchSchema(): user = pecan.request.auth_helper.get_current_user( pecan.request) _ResourceUUID = functools.partial(ResourceUUID, creator=user) return voluptuous.Schema( voluptuous.All( voluptuous.Length(min=0, max=1), { voluptuous.Any( u"=", u"==", u"eq", u"<", u"lt", u">", u"gt", u"<=", u"?", u"le", u">=", u"?", u"ge", u"!=", u"?", u"ne", ): voluptuous.All( voluptuous.Length(min=1, max=1), {"id": _ResourceUUID, NotIDKey: ResourceSearchSchemaAttributeValue}, ), u"like": voluptuous.All( voluptuous.Length(min=1, max=1), {NotIDKey: ResourceSearchSchemaAttributeValue}, ), u"in": voluptuous.All( voluptuous.Length(min=1, max=1), {"id": voluptuous.All( [_ResourceUUID], voluptuous.Length(min=1)), NotIDKey: voluptuous.All( [ResourceSearchSchemaAttributeValue], voluptuous.Length(min=1))} ), voluptuous.Any( u"and", u"?", u"or", u"?", ): voluptuous.All( [ResourceSearchSchema], voluptuous.Length(min=1) ), u"not": ResourceSearchSchema, } ) )