我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用voluptuous.Length()。
def test_raise_invalid_if_provisioner_schema_is_not_satisfied(self, mock_Provisioner): mock_Provisioner.provisioners = { 'mp1': MockProvisioner1, 'mp2': MockProvisioner2, 'mp3': MockProvisioner3} schema = get_schema() with pytest.raises(Invalid) as e: schema({ 'name': 'dummy-test', 'provisioning': [{ 'type': 'mp1', 'a': 'dummy', 'b': '16' }, { 'type': 'mp2', 'a': 'dummydummy', # Exceeds Length(min=5, max=5) }, { 'type': 'mp3', 'b': 'yes' }] }) assert "['provisioning'][1]['a']" in str(e)
def validate(self): supplier = Schema({ Required('ruc'): All(str, Length(min=11, max=11)), Required('registration_name'): str, Optional('address'): dict, Optional('commercial_name'): str }, extra=ALLOW_EXTRA) customer = Schema({ Required('ruc'): All(str, Length(min=1, max=11)) }, extra=ALLOW_EXTRA) schema = Schema({ Required('issue_date'): str, Required('supplier'): supplier, Required('customer'): customer, Required('voucher_type'): str, Required('currency'): str, Required('voucher_number'): str, Required('lines'): All(list, Length(min=1)) }, extra=ALLOW_EXTRA) schema(self._data)
def get(self): parser = reqparse.RequestParser() return_data = [] parser.add_argument('subvoat_name') args = parser.parse_args() schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))}) try: schema({'subvoat_name':args.get('subvoat_name')}) except MultipleInvalid as e: return {'error':'%s %s' % (e.msg, e.path)} posts = self.subvoat_utils.get_posts(args['subvoat_name']) for p in posts: return_data.append(p) return {'result':return_data}
def __init__(self, server): self.server = server self.guild_man = server.guild_man self.channel_edit_base = Schema({ 'name': All(str, Length(min=2, max=100)), 'position': int, Optional('nsfw'): bool, }, required=True) self.textchan_editschema = self.channel_edit_base.extend({ 'topic': All(str, Length(min=0, max=1024)) }) self.voicechan_editschema = self.channel_edit_base.extend({ 'bitrate': All(int, Range(min=8000, max=96000)), 'user_limit': All(int, Range(min=0, max=99)), }) self.register()
def schema_ext(self): return voluptuous.All(six.text_type, voluptuous.Length( min=self.min_length, max=self.max_length))
def MetricSchema(v): """metric keyword schema It could be: ["metric", "metric-ref", "aggregation"] or ["metric, ["metric-ref", "aggregation"], ["metric-ref", "aggregation"]] """ if not isinstance(v, (list, tuple)): raise voluptuous.Invalid("Expected a tuple/list, got a %s" % type(v)) elif not v: raise voluptuous.Invalid("Operation must not be empty") elif len(v) < 2: raise voluptuous.Invalid("Operation need at least one argument") elif v[0] != u"metric": # NOTE(sileht): this error message doesn't looks related to "metric", # but because that the last schema validated by voluptuous, we have # good chance (voluptuous.Any is not predictable) to print this # message even if it's an other operation that invalid. raise voluptuous.Invalid("'%s' operation invalid" % v[0]) return [u"metric"] + voluptuous.Schema(voluptuous.Any( voluptuous.ExactSequence([six.text_type, six.text_type]), voluptuous.All( voluptuous.Length(min=1), [voluptuous.ExactSequence([six.text_type, six.text_type])], )), required=True)(v[1:])
def patch(self): ap = pecan.request.indexer.get_archive_policy(self.archive_policy) if not ap: abort(404, six.text_type( indexer.NoSuchArchivePolicy(self.archive_policy))) enforce("update archive policy", ap) body = deserialize_and_validate(voluptuous.Schema({ voluptuous.Required("definition"): voluptuous.All([{ "granularity": Timespan, "points": PositiveNotNullInt, "timespan": Timespan}], voluptuous.Length(min=1)), })) # Validate the data try: ap_items = [archive_policy.ArchivePolicyItem(**item) for item in body['definition']] except ValueError as e: abort(400, six.text_type(e)) try: return pecan.request.indexer.update_archive_policy( self.archive_policy, ap_items) except indexer.UnsupportedArchivePolicyChange as e: abort(400, six.text_type(e))
def post(self): enforce("create archive policy", {}) # NOTE(jd): Initialize this one at run-time because we rely on conf conf = pecan.request.conf valid_agg_methods = ( archive_policy.ArchivePolicy.VALID_AGGREGATION_METHODS_VALUES ) ArchivePolicySchema = voluptuous.Schema({ voluptuous.Required("name"): six.text_type, voluptuous.Required("back_window", default=0): PositiveOrNullInt, voluptuous.Required( "aggregation_methods", default=set(conf.archive_policy.default_aggregation_methods)): voluptuous.All(list(valid_agg_methods), voluptuous.Coerce(set)), voluptuous.Required("definition"): voluptuous.All([{ "granularity": Timespan, "points": PositiveNotNullInt, "timespan": Timespan, }], voluptuous.Length(min=1)), }) body = deserialize_and_validate(ArchivePolicySchema) # Validate the data try: ap = archive_policy.ArchivePolicy.from_dict(body) except ValueError as e: abort(400, six.text_type(e)) enforce("create archive policy", ap) try: ap = pecan.request.indexer.create_archive_policy(ap) except indexer.ArchivePolicyAlreadyExists as e: abort(409, six.text_type(e)) location = "/archive_policy/" + ap.name set_resp_location_hdr(location) pecan.response.status = 201 return ap
def subvoat_name(self, subvoat_name): schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))}) return self.try_schema('subvoat_name', subvoat_name, schema)
def uuid(self, uuid): schema = Schema({ Required('uuid'): All(str, Length(min=36, max=36))}) return self.try_schema('uuid', uuid, schema)
def comment_body(self, comment_body): schema = Schema({ Required('comment_body'): All(str, Length(min=self.config['min_length_comment_body'], max=self.config['max_length_comment_body']))}) return self.try_schema('comment_body', comment_body, schema)
def thread(self, subvoat_name, title, body): title_schema = Schema({ Required('title'): All(str, Length(min=self.config['min_length_thread_title'], max=self.config['max_length_thread_title']))}) body_schema = Schema({ Required('body'): All(str, Length(min=self.config['min_length_thread_body'], max=self.config['max_length_thread_body']))}) # validate the subvoat_name first sn_status, sn_result = self.subvoat_name(subvoat_name) if not sn_status: return [sn_status, sn_result] # Then validate the thread title t_status, t_result = self.try_schema('title', title, title_schema) if not t_status: return [t_status, t_result] # and thread body b_status, b_result = self.try_schema('body', body, body_schema) if not b_status: return [b_status, b_result] # return True if everything is ok return [True, None]
def password(self, password): schema = Schema({ Required('password'): All(str, Length(min=self.config['min_length_password'], max=self.config['max_length_password']))}) return self.try_schema('password', password, schema)
def api_token(self, api_token): # FIX: make this actually check the token type schema = Schema({ Required('api_token'): All(str, Length(min=36, max=36))}) return self.try_schema('api_token', api_token, schema)
def __init__(self, server): self.server = server self.guild_man = server.guild_man _o = Optional self.guild_edit_schema = Schema({ _o('name'): str, _o('region'): str, _o('verification_level'): int, _o('default_message_notifications'): int, _o('afk_channel_id'): str, _o('afk_timeout'): int, _o('icon'): str, _o('owner_id'): str, }, required=True, extra=REMOVE_EXTRA) self.guild_create_schema = Schema({ 'name': str, 'region': str, 'icon': Any(None, str), 'verification_level': int, 'default_message_notifications': int, }, extra=REMOVE_EXTRA) self.channel_create_schema = Schema({ 'name': All(str, Length(min=2, max=100)), _o('type'): int, _o('bitrate'): int, _o('user_limit'): int, _o('permission_overwrites'): list, }, required=True, extra=REMOVE_EXTRA) self.register()
def valid_subscribe_topic(value, invalid_chars='\0'): """Validate that we can subscribe using this MQTT topic.""" if isinstance(value, str) and all(c not in value for c in invalid_chars): return vol.Length(min=1, max=65535)(value) raise vol.Invalid('Invalid MQTT topic name')
def MetricSchema(definition): creator = pecan.request.auth_helper.get_current_user( pecan.request) # First basic validation schema = voluptuous.Schema({ "archive_policy_name": six.text_type, "resource_id": functools.partial(ResourceID, creator=creator), "name": six.text_type, voluptuous.Optional("unit"): voluptuous.All(six.text_type, voluptuous.Length(max=31)), }) definition = schema(definition) archive_policy_name = definition.get('archive_policy_name') name = definition.get('name') if name and '/' in name: abort(400, "'/' is not supported in metric name") if archive_policy_name is None: try: ap = pecan.request.indexer.get_archive_policy_for_metric(name) except indexer.NoArchivePolicyRuleMatch: # NOTE(jd) Since this is a schema-like function, we # should/could raise ValueError, but if we do so, voluptuous # just returns a "invalid value" with no useful message – so we # prefer to use abort() to make sure the user has the right # error message abort(400, "No archive policy name specified " "and no archive policy rule found matching " "the metric name %s" % name) else: definition['archive_policy_name'] = ap.name resource_id = definition.get('resource_id') if resource_id is None: original_resource_id = None else: if name is None: abort(400, {"cause": "Attribute value error", "detail": "name", "reason": "Name cannot be null " "if resource_id is not null"}) original_resource_id, resource_id = resource_id enforce("create metric", { "creator": creator, "archive_policy_name": archive_policy_name, "resource_id": resource_id, "original_resource_id": original_resource_id, "name": name, "unit": definition.get('unit'), }) return definition