我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用voluptuous.Optional()。
def setUp(self): spy = unittest.mock.Mock() class MyClient(Client): attr_name = 'my_clients' @staticmethod def validator(): return { voluptuous.Required('foo'): int, voluptuous.Optional('bar', default='def bar'): str } def __init__(self, **kwargs): spy(kwargs) def close(self): spy('close') self.spy = spy MetaServerTestState.bind_class_client(MyClient) self.addCleanup(lambda: MetaServerTestState.forget_client(MyClient))
def _build_attributes_validator(mcs): """ Returns validator to validate the sub-classes attributes. """ valid_attributes = { Required("commands", 'required class attribute'): [ { Required("name"): str, Required("cmd"): [str], Optional("kill-signal", default=signal.SIGINT): int } ] } for attr_name, class_client in mcs._class_clients.items(): client_validator = { Required("name"): str, } client_validator.update(class_client.validator()) key = Optional(attr_name, 'required class attribute') valid_attributes[key] = [client_validator] return Schema(valid_attributes, extra=ALLOW_EXTRA)
def _get_sensor_state(self, entity_id, remote_states=None): def _opt_float(x: str) -> Optional[float]: try: return float(x) except ValueError: return None value = None if remote_states is not None: value = [_opt_float(s.state) for s in filter(lambda x: x.entity_id == entity_id, remote_states)] if value: value = value[0] else: sensor = self.hass.states.get(entity_id) if sensor is not None: value = _opt_float(sensor.state) return value
def run_information(point_in_time: Optional[datetime]=None): """Return information about current run. There is also the run that covers point_in_time. """ _verify_instance() recorder_runs = get_model('RecorderRuns') if point_in_time is None or point_in_time > _INSTANCE.recording_start: return recorder_runs( end=None, start=_INSTANCE.recording_start, closed_incorrect=False) return query('RecorderRuns').filter( (recorder_runs.start < point_in_time) & (recorder_runs.end > point_in_time)).first()
def __init__(self, data): """ Initialize with preparsed data object. :param ConfigParser data: config file accessor """ self._data = data or {} # self.latitude = None # type: Optional[float] # self.longitude = None # type: Optional[float] # self.elevation = None # type: Optional[int] # self.location_name = None # type: Optional[str] # self.time_zone = None # type: Optional[str] # self.units = METRIC_SYSTEM # type: UnitSystem # # If True, pip install is skipped for requirements on startup # self.skip_pip = False # type: bool # # List of loaded components # self.components = set() # # Remote.API object pointing at local API # self.api = None # # Directory that holds the configuration # self.config_dir = None self._scarlett_name = None self._coordinates = None self._longitude = None self._latitude = None self._pocketsphinx = None self._elevation = None self._unit_system = None self._time_zone = None self._owner_name = None self._keyword_list = None self._features_enabled = None # self._units = METRIC_SYSTEM # type: UnitSystem
def opt(key): """Create an optional key that returns a default of None.""" return vol.Optional(key, default=None)
def validate_config(_config): iptables_sql_schema = voluptuous.Schema({ "module": voluptuous.And(basestring, vu.NoSpaceCharacter()), voluptuous.Optional("db_name"): voluptuous.And( basestring, vu.NoSpaceCharacter()), }, required=True) return iptables_sql_schema(_config)
def validate_config(_config): source_schema = voluptuous.Schema({ "module": voluptuous.And(basestring, vu.NoSpaceCharacter()), "params": { "host": voluptuous.And(basestring, vu.NoSpaceCharacter()), "port": int, "model": { "name": voluptuous.And(basestring, vu.NoSpaceCharacter()), "params": { "origin_types": voluptuous.And([ { "origin_type": voluptuous.And( basestring, vu.NoSpaceCharacter()), "weight": voluptuous.And( voluptuous.Or(int, float), voluptuous.Range( min=0, min_included=False)), } ], vu.NotEmptyArray()), voluptuous.Optional("key_causes"): dict } }, "alerts_per_burst": voluptuous.And( int, voluptuous.Range(min=1)), "idle_time_between_bursts": voluptuous.And( voluptuous.Or(int, float), voluptuous.Range(min=0, min_included=False)) } }, required=True) return source_schema(_config)
def validate_config(_config): base_schema = voluptuous.Schema({ "module": voluptuous.And( basestring, lambda i: not any(c.isspace() for c in i)), voluptuous.Optional("db_name"): voluptuous.And( basestring, lambda i: not any(c.isspace() for c in i)), }, required=True) return base_schema(_config)
def schema(self): if self.required: return {self.name: self.schema_ext} else: return {voluptuous.Optional(self.name): self.schema_ext}
def ResourceSchema(schema): base_schema = { voluptuous.Optional('started_at'): utils.to_datetime, voluptuous.Optional('ended_at'): utils.to_datetime, voluptuous.Optional('user_id'): voluptuous.Any(None, six.text_type), voluptuous.Optional('project_id'): voluptuous.Any(None, six.text_type), voluptuous.Optional('metrics'): MetricsSchema, } base_schema.update(schema) return base_schema
def parse_option(option_string): if 'OPTION_NO_VALUE' in option_string: option = re.findall(r'\"(.*?)\"', option_string)[0] # The options without values seem to still need a value # when used with pilight-daemon, but this are not mandatory # options # E.G.: option 'on' is 'on': 1 return {vol.Optional(option): vol.Coerce(int)} elif 'OPTION_HAS_VALUE' in option_string: options = re.findall(r'\"(.*?)\"', option_string) option = options[0] regex = None if len(options) > 1: # Option has specified value by regex regex = options[1] if 'JSON_NUMBER' in option_string: return {vol.Required(option): vol.Coerce(int)} elif 'JSON_STRING' in option_string: return {vol.Required(option): vol.Coerce(str)} else: raise elif 'OPTION_OPT_VALUE' in option_string: options = re.findall(r'\"(.*?)\"', option_string) option = options[0] regex = None if len(options) > 1: # Option has specified value by regex regex = options[1] if 'JSON_NUMBER' in option_string: return {vol.Required(option): vol.Coerce(int)} elif 'JSON_STRING' in option_string: return {vol.Required(option): vol.Coerce(str)} else: raise else: print(option_string) raise raise
def get_dbt_rh_points(self): """Extract temperature - humidity points from sensors.""" def _mean(values: Union[List[float], Tuple[float]]) -> Optional[float]: if values: try: return sum(values) / len(values) except TypeError: _LOGGER.error('Bad values in mean: %s', values) return None results = yield from self.collect_states() points = {} for key, value in results.items(): if isinstance(value, dict): temp_zone = humid_zone = counter = 0 for k_room, s_values in value.items(): temp = _mean([v[0] for v in s_values]) humid = _mean([v[1] for v in s_values]) if temp is not None and humid is not None: points[k_room] = (int(100 * temp) / 100., int(100 * humid) / 100.) temp_zone += temp humid_zone += humid counter += 1 if counter: points[key] = (int(100 * temp_zone / counter) / 100., int(100 * humid_zone / counter) / 100.) else: temp = _mean([v[0] for v in value]) humid = _mean([v[1] for v in value]) if temp is not None and humid is not None: points[key] = (int(100 * temp) / 100., int(100 * humid) / 100.) return points
def log_error(e: Exception, retry_wait: Optional[float]=0, rollback: Optional[bool]=True, message: Optional[str]="Error during query: %s") -> None: """Log about SQLAlchemy errors in a sane manner.""" import sqlalchemy.exc if not isinstance(e, sqlalchemy.exc.OperationalError): _LOGGER.exception(str(e)) else: _LOGGER.error(message, str(e)) if rollback: Session.rollback() if retry_wait: _LOGGER.info("Retrying in %s seconds", retry_wait) time.sleep(retry_wait)
def async_load_config(path: str, hass: HomeAssistantType, consider_home: timedelta): """Load devices from YAML configuration file. This method is a coroutine. """ dev_schema = vol.Schema({ vol.Required('name'): cv.string, vol.Optional('track', default=False): cv.boolean, vol.Optional('mac', default=None): vol.Any(None, vol.All(cv.string, vol.Upper)), vol.Optional(CONF_AWAY_HIDE, default=DEFAULT_AWAY_HIDE): cv.boolean, vol.Optional('gravatar', default=None): vol.Any(None, cv.string), vol.Optional('picture', default=None): vol.Any(None, cv.string), vol.Optional(CONF_CONSIDER_HOME, default=consider_home): vol.All( cv.time_period, cv.positive_timedelta), vol.Optional('vendor', default=None): vol.Any(None, cv.string), }) try: result = [] try: devices = yield from hass.loop.run_in_executor( None, load_yaml_config_file, path) except HomeAssistantError as err: _LOGGER.error('Unable to load %s: %s', path, str(err)) return [] for dev_id, device in devices.items(): try: device = dev_schema(device) device['dev_id'] = cv.slugify(dev_id) except vol.Invalid as exp: async_log_exception(exp, dev_id, devices, hass) else: result.append(Device(hass, **device)) return result except (HomeAssistantError, FileNotFoundError): # When YAML file could not be loaded/did not contain a dict return []
def _validate_schema(config): """Validate the configuration, with spark, up to the orchestration level Checks that hte spark configuration is valid, as well as the modules structure in the configuration up to the orchestration level. Each module will be responsible to validate its own sub-configuration. :type config: dict :param config: configuration model for the whole system :raises: SchemaError -- if the configuration, up to the orchestration level, is not valid """ config_schema = voluptuous.Schema({ "spark_config": { "appName": basestring, "streaming": { "batch_interval": voluptuous.And(int, voluptuous.Range(min=1)) } }, "server": { "port": int, "debug": bool }, "sources": { voluptuous.Optional(basestring): {basestring: object} }, "ingestors": { voluptuous.Optional(basestring): {basestring: object} }, "smls": { voluptuous.Optional(basestring): {basestring: object} }, "voters": { voluptuous.Optional(basestring): {basestring: object} }, "sinks": { voluptuous.Optional(basestring): {basestring: object} }, "ldps": { voluptuous.Optional(basestring): {basestring: object} }, "connections": { voluptuous.Optional(basestring): [basestring] }, "feedback": { voluptuous.Optional(basestring): [basestring] } }, required=True) return config_schema(config)
def MetricSchema(definition): creator = pecan.request.auth_helper.get_current_user( pecan.request) # First basic validation schema = voluptuous.Schema({ "archive_policy_name": six.text_type, "resource_id": functools.partial(ResourceID, creator=creator), "name": six.text_type, voluptuous.Optional("unit"): voluptuous.All(six.text_type, voluptuous.Length(max=31)), }) definition = schema(definition) archive_policy_name = definition.get('archive_policy_name') name = definition.get('name') if name and '/' in name: abort(400, "'/' is not supported in metric name") if archive_policy_name is None: try: ap = pecan.request.indexer.get_archive_policy_for_metric(name) except indexer.NoArchivePolicyRuleMatch: # NOTE(jd) Since this is a schema-like function, we # should/could raise ValueError, but if we do so, voluptuous # just returns a "invalid value" with no useful message – so we # prefer to use abort() to make sure the user has the right # error message abort(400, "No archive policy name specified " "and no archive policy rule found matching " "the metric name %s" % name) else: definition['archive_policy_name'] = ap.name resource_id = definition.get('resource_id') if resource_id is None: original_resource_id = None else: if name is None: abort(400, {"cause": "Attribute value error", "detail": "name", "reason": "Name cannot be null " "if resource_id is not null"}) original_resource_id, resource_id = resource_id enforce("create metric", { "creator": creator, "archive_policy_name": archive_policy_name, "resource_id": resource_id, "original_resource_id": original_resource_id, "name": name, "unit": definition.get('unit'), }) return definition