def validate(schema, data): """ A wrapper around the call to voluptuous schema to raise the proper exception. Args: schema: The voluptuous Schema object data: The validation data for the schema object Raises: APIException with status 0 and the voluptuous error message """ try: schema(data) except MultipleInvalid as error: raise APIException(0, None, error.msg)
def get_problem(problem_path): """ Retrieve a problem spec from a given problem directory. Args: problem_path: path to the root of the problem directory. Returns: A problem object. """ json_path = join(problem_path, "problem.json") problem = json.loads(open(json_path, "r").read()) try: problem_schema(problem) except MultipleInvalid as e: logger.critical("Error validating problem object at '%s'!", json_path) logger.critical(e) raise FatalException return problem
def get_bundle(bundle_path): """ Retrieve a bundle spec from a given bundle directory. Args: bundle_path: path to the root of the bundle directory. Returns: A bundle object. """ json_path = join(bundle_path, "bundle.json") bundle = json.loads(open(json_path, "r").read()) try: bundle_schema(bundle) except MultipleInvalid as e: logger.critical("Error validating bundle object at '%s'!", json_path) logger.critical(e) raise FatalException return bundle
def test_entity_ids(): """Test entity ID validation.""" schema = vol.Schema(cv.entity_ids) for value in ( 'invalid_entity', 'sensor.light,sensor_invalid', ['invalid_entity'], ['sensor.light', 'sensor_invalid'], ['sensor.light,sensor_invalid'], ): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ( [], ['sensor.light'], 'sensor.light' ): schema(value) assert schema('sensor.LIGHT, light.kitchen ') == [ 'sensor.light', 'light.kitchen' ]
def test_time_period(): """Test time_period validation.""" schema = vol.Schema(cv.time_period) for value in ( None, '', 'hello:world', '12:', '12:34:56:78', {}, {'wrong_key': -10} ): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ( '8:20', '23:59', '-8:20', '-23:59:59', '-48:00', {'minutes': 5}, 1, '5' ): schema(value) assert timedelta(seconds=180) == schema('180') assert timedelta(hours=23, minutes=59) == schema('23:59') assert -1 * timedelta(hours=1, minutes=15) == schema('-1:15')
def valid_adapter_response(base_name, func_name, data): """ Valid that given data match described schema :param str base_name: The name of the asbtract :param str func_name: The name of the called function :parma raw data: The data to valid :raises InvalidFormatError: if data is not compliant """ if not data: return try: Schemas[base_name][func_name](data) except (KeyError, TypeError, ValueError): raise SchemaNotFound('Schema not found for %s.%s' % (base_name, func_name)) except (Invalid, MultipleInvalid) as ex: raise InvalidFormatError('Given data is not compliant to %s.%s schema: %s' % (base_name, func_name, str(ex)))
def validate_body(schema_desc): """ Validate json body """ def real_decorator(func): @wraps(func) def wrapper(*args, **kwargs): try: body = request.get_json() if not Schemas.get(func.__name__): Schemas[func.__name__] = Schema(schema_desc, required=True) Logger.debug(unicode('registering schema for %s' % (func.__name__))) Schemas[func.__name__](body) except (Invalid, MultipleInvalid) as ex: Logger.error(unicode(ex)) msg = 'Missing or invalid field(s) in body, expecting {}'.format(schema_desc) raise BadRequest(msg) return func(*args, **kwargs) return wrapper return real_decorator
def process_cleanup_arch(self): log.log(log.LOG_INFO, "Processing Cleanup of Architectures") for arch in self.get_config_section('cleanup-architecture'): try: self.validator.cleanup_arch(arch) except MultipleInvalid as e: log.log(log.LOG_WARN, "Cannot delete Architecture '{0}': YAML validation Error: {1}".format(arch['name'], e)) continue try: self.fm.architectures.show(arch['name'])['id'] log.log(log.LOG_INFO, "Delete Architecture '{0}'".format(arch['name'])) self.fm.architectures.destroy( arch['name'] ) except: log.log(log.LOG_WARN, "Architecture '{0}' already absent.".format(arch['name']))
def process_cleanup_computeprfl(self): log.log(log.LOG_INFO, "Processing Cleanup of Compute profiles") for computeprfl in self.get_config_section('cleanup-compute-profile'): try: self.validator.cleanup_computeprfl(computeprfl) except MultipleInvalid as e: log.log(log.LOG_WARN, "Cannot delete Compute profile '{0}': YAML validation Error: {1}".format(computeprfl['name'], e)) continue try: self.fm.compute_profiles.show(computeprfl['name'])['id'] log.log(log.LOG_INFO, "Delete Compute profile '{0}'".format(computeprfl['name'])) self.fm.compute_profiles.destroy( computeprfl['name'] ) except: log.log(log.LOG_WARN, "Compute profile '{0}' already absent.".format(computeprfl['name']))
def process_cleanup_medium(self): log.log(log.LOG_INFO, "Processing Cleanup of Media") medialist = self.fm.media.index(per_page=99999)['results'] for medium in self.get_config_section('cleanup-medium'): try: self.validator.cleanup_medium(medium) except MultipleInvalid as e: log.log(log.LOG_WARN, "Cannot delete Medium '{0}': YAML validation Error: {1}".format(medium['name'], e)) continue medium_deleted = False # fm.media.show(name) does not work, we need to iterate over fm.media.index() for mediac in medialist: if (mediac['name'] == medium['name']): medium_deleted = True log.log(log.LOG_INFO, "Delete Medium '{0}'".format(medium['name'])) self.fm.media.destroy( medium['name'] ) continue if not medium_deleted: log.log(log.LOG_WARN, "Medium '{0}' already absent.".format(medium['name']))
def process_cleanup_ptable(self): log.log(log.LOG_INFO, "Processing Cleanup of Partition Tables") for ptable in self.get_config_section('cleanup-partition-table'): try: self.validator.cleanup_ptable(ptable) except MultipleInvalid as e: log.log(log.LOG_WARN, "Cannot delete Partition Table '{0}': YAML validation Error: {1}".format(ptable['name'], e)) continue try: self.fm.ptables.show(ptable['name'])['id'] log.log(log.LOG_INFO, "Delete Partition Table '{0}'".format(ptable['name'])) self.fm.ptables.destroy( ptable['name'] ) except: log.log(log.LOG_WARN, "Partition Table '{0}' already absent.".format(ptable['name']))
def process_cleanup_provisioningtpl(self): log.log(log.LOG_INFO, "Processing Cleanup of Provisioning Templates") ptlist = self.fm.provisioning_templates.index(per_page=99999)['results'] for pt in self.get_config_section('cleanup-provisioning-template'): try: self.validator.cleanup_provt(pt) except MultipleInvalid as e: log.log(log.LOG_WARN, "Cannot delete Provisioning Template '{0}': YAML validation Error: {1}".format(pt['name'], e)) continue # fm.provisioning_templates.show(name) does not work as expected, we need to iterate over fm.provisioning_templates.index() pt_deleted = False for ptc in ptlist: if (ptc['name'] == pt['name']): pt_deleted = True log.log(log.LOG_INFO, "Delete Provisioning Template '{0}'".format(pt['name'])) self.fm.provisioning_templates.destroy( pt['name'] ) continue if not pt_deleted: log.log(log.LOG_WARN, "Provisioning Template '{0}' already absent.".format(pt['name']))
def process_config_enviroment(self): log.log(log.LOG_INFO, "Processing Environments") envlist = self.fm.environments.index(per_page=99999)['results'] for env in self.get_config_section('environment'): try: self.validator.enviroment(env) except MultipleInvalid as e: log.log(log.LOG_WARN, "Cannot create Environment '{0}': YAML validation Error: {1}".format(env['name'], e)) continue env_id = False # fm.media.show(name) does not work, we need to iterate over fm.media.index() for envc in envlist: if (env['name'] == envc['name']): env_id = envc['id'] log.log(log.LOG_DEBUG, "Environment '{0}' (id={1}) already present.".format(env['name'], env_id)) continue if not env_id: log.log(log.LOG_INFO, "Create Environment '{0}'".format(env['name'])) self.fm.environments.create( environment = { 'name': env['name'] } )
def process_config_model(self): log.log(log.LOG_INFO, "Processing Models") for model in self.get_config_section('model'): try: self.validator.model(model) except MultipleInvalid as e: log.log(log.LOG_WARN, "Cannot create Model '{0}': YAML validation Error: {1}".format(model['name'], e)) continue try: model_id = self.fm.models.show(model['name'])['id'] log.log(log.LOG_DEBUG, "Model '{0}' (id={1}) already present.".format(model['name'], model_id)) except: log.log(log.LOG_INFO, "Create Model '{0}'".format(model['name'])) model_tpl = { 'name': model['name'], 'info': model['info'], 'vendor_class': model['vendor-class'], 'hardware_model': model['hardware-model'] } self.fm.models.create( model = model_tpl )
def process_config_medium(self): log.log(log.LOG_INFO, "Processing Media") medialist = self.fm.media.index(per_page=99999)['results'] for medium in self.get_config_section('medium'): try: self.validator.medium(medium) except MultipleInvalid as e: log.log(log.LOG_WARN, "Cannot create Media '{0}': YAML validation Error: {1}".format(medium['name'], e)) continue medium_id = False # fm.media.show(name) does not work, we need to iterate over fm.media.index() for mediac in medialist: if (mediac['name'] == medium['name']): medium_id = mediac['id'] log.log(log.LOG_DEBUG, "Medium '{0}' (id={1}) already present.".format(medium['name'], medium_id)) if not medium_id: log.log(log.LOG_INFO, "Create Medium '{0}'".format(medium['name'])) medium_tpl = { 'name': medium['name'], 'path': medium['path'], 'os_family': medium['os-family'] } self.fm.media.create( medium = medium_tpl )
def process_config_settings(self): log.log(log.LOG_INFO, "Processing Foreman Settings") for setting in self.get_config_section('setting'): try: self.validator.setting(setting) except MultipleInvalid as e: log.log(log.LOG_WARN, "Cannot update Setting '{0}': YAML validation Error: {1}".format(setting['name'], e)) continue setting_id = False try: setting_id = self.fm.settings.show(setting['name'])['id'] except: log.log(log.LOG_WARN, "Cannot get ID of Setting '{0}', skipping".format(setting['name'])) setting_tpl = { 'value': setting['value'] } if setting_id: log.log(log.LOG_INFO, "Update Setting '{0}'".format(setting['name'])) self.fm.settings.update(setting_tpl, setting_id)
def process_auth_sources_ldap(self): log.log(log.LOG_INFO, "Processing LDAP auth sources") for auth in self.get_config_section('auth-source-ldap'): # validate yaml try: self.validator.auth_source_ldaps(auth) except MultipleInvalid as e: log.log(log.LOG_WARN, "Cannot create LDAP source '{0}': YAML validation Error: {1}".format(auth['name'], e)) continue try: as_id = self.fm.auth_source_ldaps.show(auth['name'])['id'] log.log(log.LOG_WARN, "LDAP source {0} allready exists".format(auth['name'])) continue except TypeError: pass ldap_auth_obj = self.dict_underscore(auth) try: self.fm.auth_source_ldaps.create( auth_source_ldap=ldap_auth_obj ) except: log.log(log.LOG_ERROR, "Something went wrong creating LDAP source {0}".format(auth['name']))
def get(self): parser = reqparse.RequestParser() return_data = [] parser.add_argument('subvoat_name') args = parser.parse_args() schema = Schema({ Required('subvoat_name'): All(str, Length(min=self.config['min_length_subvoat_name']))}) try: schema({'subvoat_name':args.get('subvoat_name')}) except MultipleInvalid as e: return {'error':'%s %s' % (e.msg, e.path)} posts = self.subvoat_utils.get_posts(args['subvoat_name']) for p in posts: return_data.append(p) return {'result':return_data}
def test_event_schema(): """Test event_schema validation.""" for value in ( {}, None, { 'event_data': {}, }, { 'event': 'state_changed', 'event_data': 1, }, ): with pytest.raises(vol.MultipleInvalid): cv.EVENT_SCHEMA(value) for value in ( {'event': 'state_changed'}, {'event': 'state_changed', 'event_data': {'hello': 'world'}}, ): cv.EVENT_SCHEMA(value)
def test_core_config_schema(self): """Test core config schema.""" for value in ( {CONF_UNIT_SYSTEM: 'K'}, {'time_zone': 'non-exist'}, {'latitude': '91'}, {'longitude': -181}, {'customize': 'bla'}, {'customize': {'invalid_entity_id': {}}}, {'customize': {'light.sensor': 100}}, ): with pytest.raises(MultipleInvalid): config_util.CORE_CONFIG_SCHEMA(value) config_util.CORE_CONFIG_SCHEMA({ 'name': 'Test name', 'latitude': '-23.45', 'longitude': '123.45', CONF_UNIT_SYSTEM: CONF_UNIT_SYSTEM_METRIC, 'customize': { 'sensor.temperature': { 'hidden': True, }, }, })
def verify_config(config_object): """ Verifies the given configuration dict against the config_schema and the port_range_schema Raise FatalException if failed. Args: config_object: The configuration options in a dict """ try: config_schema(config_object) except MultipleInvalid as e: logger.critical("Error validating config file at '%s'!", path) logger.critical(e) raise FatalException for port_range in config_object["banned_ports"]: try: port_range_schema(port_range) assert port_range["start"] <= port_range["end"] except MultipleInvalid as e: logger.critical("Error validating port range in config file at '%s'!", path) logger.critical(e) raise FatalException except AssertionError as e: logger.critical("Invalid port range: (%d -> %d)", port_range["start"], port_range["end"]) raise FatalException
def __init__(self, **kargs): super(EntityBase, self).__init__(kargs) self.validate() try: raise getattr(self, 'error') except AttributeError: pass except Invalid as e: raise MultipleInvalid([e])
def test_invalid_products(self): self.assertRaises(MultipleInvalid, Produto, codigo='abc')
def test_invalid_coletas(self): self.assertRaises(MultipleInvalid, ColetaSimultanea, codigo='abc')
def test_invalid_objetos(self): self.assertRaises(MultipleInvalid, Objeto, id='1') self.assertRaises(MultipleInvalid, Objeto, item=None)
def test_invalid_coletas(self): self.assertRaises(MultipleInvalid, Coleta, codigo='abc')
def test_boolean(): """Test boolean validation.""" schema = vol.Schema(cv.boolean) for value in ('T', 'negative', 'lock'): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ('true', 'On', '1', 'YES', 'enable', 1, True): assert schema(value) for value in ('false', 'Off', '0', 'NO', 'disable', 0, False): assert not schema(value)
def test_latitude(): """Test latitude validation.""" schema = vol.Schema(cv.latitude) for value in ('invalid', None, -91, 91, '-91', '91', '123.01A'): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ('-89', 89, '12.34'): schema(value)
def test_longitude(): """Test longitude validation.""" schema = vol.Schema(cv.longitude) for value in ('invalid', None, -181, 181, '-181', '181', '123.01A'): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ('-179', 179, '12.34'): schema(value)
def test_port(): """Test TCP/UDP network port.""" schema = vol.Schema(cv.port) for value in ('invalid', None, -1, 0, 80000, '81000'): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ('1000', 21, 24574): schema(value)
def test_url(): """Test URL.""" schema = vol.Schema(cv.url) for value in ('invalid', None, 100, 'htp://ha.io', 'http//ha.io', 'http://??,**', 'https://??,**'): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ('http://localhost', 'https://localhost/test/index.html', 'http://home-assistant.io', 'http://home-assistant.io/test/', 'https://community.home-assistant.io/'): assert schema(value)
def test_icon(): """Test icon validation.""" schema = vol.Schema(cv.icon) for value in (False, 'work', 'icon:work'): with pytest.raises(vol.MultipleInvalid): schema(value) schema('mdi:work')
def test_service(): """Test service validation.""" schema = vol.Schema(cv.service) with pytest.raises(vol.MultipleInvalid): schema('invalid_turn_on') schema('scarlett_os.turn_on')
def test_slug(): """Test slug validation.""" schema = vol.Schema(cv.slug) for value in (None, 'hello world'): with pytest.raises(vol.MultipleInvalid): schema(value) for value in (12345, 'hello'): schema(value)
def test_temperature_unit(): """Test temperature unit validation.""" schema = vol.Schema(cv.temperature_unit) with pytest.raises(vol.MultipleInvalid): schema('K') schema('C') schema('F')
def test_time_zone(): """Test time zone validation.""" schema = vol.Schema(cv.time_zone) with pytest.raises(vol.MultipleInvalid): schema('America/Do_Not_Exist') schema('America/Los_Angeles') schema('UTC')
def test_has_at_least_one_key(): """Test has_at_least_one_key validator.""" schema = vol.Schema(cv.has_at_least_one_key('beer', 'soda')) for value in (None, [], {}, {'wine': None}): with pytest.raises(vol.MultipleInvalid): schema(value) for value in ({'beer': None}, {'soda': None}): schema(value)
def process_config_domain(self): log.log(log.LOG_INFO, "Processing Domains") for domain in self.get_config_section('domain'): try: self.validator.domain(domain) except MultipleInvalid as e: log.log(log.LOG_WARN, "Cannot create Domain '{0}': YAML validation Error: {1}".format(domain['name'], e)) continue try: dom_id = self.fm.domains.show(domain['name'])['id'] log.log(log.LOG_DEBUG, "Domain '{0}' (id={1}) already present.".format(domain['name'], dom_id)) except: dns_proxy_id = False try: dns_proxy_id = self.fm.smart_proxies.show(domain['dns-proxy'])['id'] except: log.log(log.LOG_WARN, "Cannot get ID of DNS Smart Proxy '{0}', skipping".format(domain['dns-proxy'])) log.log(log.LOG_INFO, "Create Domain '{0}'".format(domain['name'])) dom_params = [] if (domain['parameters']): for name,value in domain['parameters'].iteritems(): p = { 'name': name, 'value': value } dom_params.append(p) dom_tpl = { 'name': domain['name'], 'fullname': domain['fullname'], } fixdom = { 'domain_parameters_attributes': dom_params } if dns_proxy_id: dom_tpl['dns_id'] = dns_proxy_id domo = self.fm.domains.create( domain = dom_tpl ) if dom_params: self.fm.domains.update(fixdom, domo['id'])
def process_config_os(self): log.log(log.LOG_INFO, "Processing Operating Systems") for operatingsystem in self.get_config_section('os'): try: self.validator.os(operatingsystem) except MultipleInvalid as e: log.log(log.LOG_WARN, "Cannot create Operating System '{0}': YAML validation Error: {1}".format(operatingsystem['name'], e)) continue try: os_id = self.fm.operatingsystems.show(operatingsystem['description'])['id'] log.log(log.LOG_DEBUG, "Operating System '{0}' (id={1}) already present.".format(operatingsystem['name'], os_id)) except: log.log(log.LOG_INFO, "Create Operating System '{0}'".format(operatingsystem['name'])) os_tpl = { 'name': operatingsystem['name'], 'description': operatingsystem['description'], 'major': operatingsystem['major'], 'minor': operatingsystem['minor'], 'family': operatingsystem['family'], 'release_name': operatingsystem['release-name'], 'password_hash': operatingsystem['password-hash'] } os_obj = self.fm.operatingsystems.create(operatingsystem=os_tpl) # host_params if operatingsystem['parameters'] is not None: for name,value in operatingsystem['parameters'].iteritems(): p = { 'name': name, 'value': value } try: self.fm.operatingsystems.parameters_create(os_obj['id'], p ) except: log.log(log.LOG_WARN, "Error adding host parameter '{0}'".format(name))