我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用oslo_utils.uuidutils.is_uuid_like()。
def add_identity_filter(query, value): """Adds an identity filter to a query. Filters results by ID, if supplied value is a valid integer. Otherwise attempts to filter results by UUID. :param query: Initial query to add filter to. :param value: Value for filtering results by. :return: Modified query. """ if strutils.is_int_like(value): return query.filter_by(id=value) elif uuidutils.is_uuid_like(value): return query.filter_by(uuid=value) else: raise exception.InvalidIdentity(identity=value)
def destroy_board(self, board_id): session = get_session() with session.begin(): query = model_query(models.Board, session=session) query = add_identity_filter(query, board_id) try: board_ref = query.one() except NoResultFound: raise exception.BoardNotFound(board=board_id) # Get board ID, if an UUID was supplied. The ID is # required for deleting all ports, attached to the board. if uuidutils.is_uuid_like(board_id): board_id = board_ref['id'] location_query = model_query(models.Location, session=session) location_query = self._add_location_filter_by_board( location_query, board_id) location_query.delete() query.delete()
def destroy_plugin(self, plugin_id): session = get_session() with session.begin(): query = model_query(models.Plugin, session=session) query = add_identity_filter(query, plugin_id) try: plugin_ref = query.one() except NoResultFound: raise exception.PluginNotFound(plugin=plugin_id) # Get plugin ID, if an UUID was supplied. The ID is # required for deleting all ports, attached to the plugin. if uuidutils.is_uuid_like(plugin_id): plugin_id = plugin_ref['id'] query.delete()
def get_rpc_plugin(plugin_ident): """Get the RPC plugin from the plugin uuid or logical name. :param plugin_ident: the UUID or logical name of a plugin. :returns: The RPC Plugin. :raises: InvalidUuidOrName if the name or uuid provided is not valid. :raises: PluginNotFound if the plugin is not found. """ # Check to see if the plugin_ident is a valid UUID. If it is, treat it # as a UUID. if uuidutils.is_uuid_like(plugin_ident): return objects.Plugin.get_by_uuid(pecan.request.context, plugin_ident) # We can refer to plugins by their name, if the client supports it # if allow_plugin_logical_names(): # if utils.is_hostname_safe(plugin_ident): else: return objects.Plugin.get_by_name(pecan.request.context, plugin_ident) raise exception.InvalidUuidOrName(name=plugin_ident) raise exception.PluginNotFound(plugin=plugin_ident)
def do_verification_create(cs, args): """Creates a verification.""" if not uuidutils.is_uuid_like(args.provider_id): raise exceptions.CommandError( "Invalid provider id provided.") if not uuidutils.is_uuid_like(args.checkpoint_id): raise exceptions.CommandError( "Invalid checkpoint id provided.") verification_parameters = arg_utils.extract_parameters(args) verification = cs.verifications.create(args.provider_id, args.checkpoint_id, verification_parameters) dict_format_list = {"parameters"} utils.print_dict(verification.to_dict(), dict_format_list=dict_format_list)
def add_identity_filter(query, value): """Adds an identity filter to a query. Filters results by ID, if supplied value is a valid integer. Otherwise attempts to filter results by UUID. :param query: Initial query to add filter to. :param value: Value for filtering results by. :return: Modified query. """ if strutils.is_int_like(value): return query.filter_by(id=value) elif uuidutils.is_uuid_like(value): return query.filter_by(uuid=value) else: raise exception.InvalidParameterValue(identity=value)
def get_uuid_by_name(manager, name, segment=None): """Helper methods for getting uuid of segment or host by name. :param manager: A client manager class :param name: The resource we are trying to find a uuid :param segment: segment id, default None :return: The uuid of found resource """ # If it cannot be found return the name. uuid = name if not uuidutils.is_uuid_like(name): if segment: items = manager.hosts(segment) else: items = manager.segments() for item in items: item_name = getattr(item, 'name') if item_name == name: uuid = getattr(item, 'uuid') break return uuid
def _get_floating_ip_pool_id_by_name_or_id(self, client, name_or_id): search_opts = {constants.NET_EXTERNAL: True, 'fields': 'id'} if uuidutils.is_uuid_like(name_or_id): search_opts.update({'id': name_or_id}) else: search_opts.update({'name': name_or_id}) data = client.list_networks(**search_opts) nets = data['networks'] if len(nets) == 1: return nets[0]['id'] elif len(nets) == 0: raise exception.FloatingIpPoolNotFound() else: msg = (_("Multiple floating IP pools matches found for name '%s'") % name_or_id) raise exception.NovaException(message=msg)
def fixed_ip_get_by_instance(context, instance_uuid): if not uuidutils.is_uuid_like(instance_uuid): raise exception.InvalidUUID(uuid=instance_uuid) vif_and = and_(models.VirtualInterface.id == models.FixedIp.virtual_interface_id, models.VirtualInterface.deleted == 0) result = model_query(context, models.FixedIp, read_deleted="no").\ filter_by(instance_uuid=instance_uuid).\ outerjoin(models.VirtualInterface, vif_and).\ options(contains_eager("virtual_interface")).\ options(joinedload('network')).\ options(joinedload('floating_ips')).\ order_by(asc(models.VirtualInterface.created_at), asc(models.VirtualInterface.id)).\ all() if not result: raise exception.FixedIpNotFoundForInstance(instance_uuid=instance_uuid) return result
def _get_requested_instance_group(context, filter_properties, check_quota): if (not filter_properties or not filter_properties.get('scheduler_hints')): return group_hint = filter_properties.get('scheduler_hints').get('group') if not group_hint: return # TODO(gibi): We need to remove the following validation code when # removing legacy v2 code. if not uuidutils.is_uuid_like(group_hint): msg = _('Server group scheduler hint must be a UUID.') raise exception.InvalidInput(reason=msg) return objects.InstanceGroup.get_by_uuid(context, group_hint)
def create_project(self, name, variables=None): url = self.url + '/v1/projects' payload = {'name': name} if variables: payload['variables'] = variables response = self.post(url, headers=self.root_headers, data=payload) self.assertEqual(201, response.status_code) self.assertIn('Location', response.headers) project = response.json() self.assertTrue(uuidutils.is_uuid_like(project['id'])) self.assertEqual( response.headers['Location'], "{}/{}".format(url, project['id']) ) return project
def process_request(self, request): headers = request.headers project_id = headers.get('X-Auth-Project') if not uuidutils.is_uuid_like(project_id): raise exceptions.AuthenticationError( message="Project ID ('{}') is not a valid UUID".format( project_id ) ) ctx = self.make_context( request, auth_token=headers.get('X-Auth-Token', None), user=headers.get('X-Auth-User', None), tenant=project_id, ) # NOTE(sulo): this means every api call hits the db # at least once for auth. Better way to handle this? try: user_info = dbapi.get_user_info(ctx, headers.get('X-Auth-User', None)) if user_info.api_key != headers.get('X-Auth-Token', None): raise exceptions.AuthenticationError if user_info.is_root: ctx.is_admin = True ctx.is_admin_project = True elif user_info.is_admin: ctx.is_admin = True ctx.is_admin_project = False else: ctx.is_admin = False ctx.is_admin_project = False except exceptions.NotFound: raise exceptions.AuthenticationError
def validate(value): if not uuidutils.is_uuid_like(value): raise exception.InvalidUUID(uuid=value) return value
def validate(value): if not (uuidutils.is_uuid_like(value) or v1_utils.is_valid_logical_name(value)): raise exception.InvalidUuidOrName(name=value) return value
def is_valid_board_name(name): """Determine if the provided name is a valid board name. Check to see that the provided board name is valid, and isn't a UUID. :param: name: the board name to check. :returns: True if the name is valid, False otherwise. """ return utils.is_hostname_safe(name) and (not uuidutils.is_uuid_like(name))
def is_valid_name(name): """Determine if the provided name is a valid name. Check to see that the provided board name isn't a UUID. :param: name: the board name to check. :returns: True if the name is valid, False otherwise. """ return not uuidutils.is_uuid_like(name)
def get(cls, context, plugin_id): """Find a plugin based on its id or uuid and return a Board object. :param plugin_id: the id *or* uuid of a plugin. :returns: a :class:`Board` object. """ if strutils.is_int_like(plugin_id): return cls.get_by_id(context, plugin_id) elif uuidutils.is_uuid_like(plugin_id): return cls.get_by_uuid(context, plugin_id) else: raise exception.InvalidIdentity(identity=plugin_id)
def get(cls, context, session_or_board_uuid): """Find a session based on its id or uuid and return a SessionWP object. :param session_id: the id *or* uuid of a session. :returns: a :class:`SessionWP` object. """ if strutils.is_int_like(session_or_board_uuid): return cls.get_by_id(context, session_or_board_uuid) elif uuidutils.is_uuid_like(session_or_board_uuid): return cls.get_by_uuid(context, session_or_board_uuid) else: raise exception.InvalidIdentity(identity=session_or_board_uuid)
def test_request_id(self): response = self.app.get('/v2.1/') self.assertIn('x-openstack-request-id', response.headers) self.assertTrue( response.headers['x-openstack-request-id'].startswith('req-')) id_part = response.headers['x-openstack-request-id'].split('req-')[1] self.assertTrue(uuidutils.is_uuid_like(id_part))
def test_request_id(self): response = self.app.get('/') self.assertIn('x-openstack-request-id', response.headers) self.assertTrue( response.headers['x-openstack-request-id'].startswith('req-')) id_part = response.headers['x-openstack-request-id'].split('req-')[1] self.assertTrue(uuidutils.is_uuid_like(id_part))
def param2id(object_id): """Helper function to convert various id types to internal id. args: [object_id], e.g. 'vol-0000000a' or 'volume-0000000a' or '10' """ if uuidutils.is_uuid_like(object_id): return object_id elif '-' in object_id: # FIXME(ja): mapping occurs in nova? pass else: return int(object_id)
def get_neutron_network(self, network): if uuidutils.is_uuid_like(network): networks = self.neutron.list_networks(id=network)['networks'] else: networks = self.neutron.list_networks(name=network)['networks'] if len(networks) == 0: raise exception.NetworkNotFound(network=network) elif len(networks) > 1: raise exception.Conflict(_( 'Multiple neutron networks exist with same name. ' 'Please use the uuid instead.')) network = networks[0] return network
def get_neutron_port(self, port): if uuidutils.is_uuid_like(port): ports = self.list_ports(id=port)['ports'] else: ports = self.list_ports(name=port)['ports'] if len(ports) == 0: raise exception.PortNotFound(port=port) elif len(ports) > 1: raise exception.Conflict(_( 'Multiple neutron ports exist with same name. ' 'Please use the uuid instead.')) port = ports[0] return port
def get_resource_provider(self, context, provider_ident): if uuidutils.is_uuid_like(provider_ident): return self._get_resource_provider_by_uuid(context, provider_ident) else: return self._get_resource_provider_by_name(context, provider_ident)
def get_resource_class(self, context, resource_ident): if uuidutils.is_uuid_like(resource_ident): return self._get_resource_class_by_uuid(context, resource_ident) else: return self._get_resource_class_by_name(context, resource_ident)
def get_resource_class(self, context, ident): if uuidutils.is_uuid_like(ident): return self._get_resource_class_by_uuid(context, ident) else: return self._get_resource_class_by_name(context, ident)
def search_volume(self, volume): if uuidutils.is_uuid_like(volume): volume = self.cinder.volumes.get(volume) else: try: volume = self.cinder.volumes.find(name=volume) except cinder_exception.NotFound: raise exception.VolumeNotFound(volume=volume) except cinder_exception.NoUniqueMatch: raise exception.Conflict(_( 'Multiple cinder volumes exist with same name. ' 'Please use the uuid instead.')) return volume
def validate(cls, value): if not uuidutils.is_uuid_like(value): raise exception.InvalidUUID(uuid=value) return value
def _check_security_group(self, context, security_group, container): if security_group.get("uuid"): security_group_id = security_group.get("uuid") if not uuidutils.is_uuid_like(security_group_id): raise exception.InvalidUUID(uuid=security_group_id) if security_group_id in container.security_groups: msg = _("security_group %s already present in container") % \ security_group_id raise exception.InvalidValue(msg) else: security_group_ids = utils.get_security_group_ids( context, [security_group['name']]) if len(security_group_ids) > len(security_group): msg = _("Multiple security group matches " "found for name %(name)s, use an ID " "to be more specific. ") % security_group raise exception.Conflict(msg) else: security_group_id = security_group_ids[0] container_ports_detail = utils.list_ports(context, container) for container_port_detail in container_ports_detail: if security_group_id in container_port_detail['security_groups']: msg = _("security_group %s already present in container") % \ list(security_group.values())[0] raise exception.InvalidValue(msg) return security_group_id
def get_resource(resource, resource_ident): """Get the resource from the uuid or logical name. :param resource: the resource type. :param resource_ident: the UUID or logical name of the resource. :returns: The resource. """ resource = getattr(objects, resource) if uuidutils.is_uuid_like(resource_ident): return resource.get_by_uuid(pecan.request.context, resource_ident) return resource.get_by_name(pecan.request.context, resource_ident)
def check_active(self, server): """Check server status. Accepts both server IDs and server objects. Returns True if server is ACTIVE, raises errors when server has an ERROR or unknown to Zun status, returns False otherwise. """ # not checking with is_uuid_like as most tests use strings e.g. '1234' if isinstance(server, six.string_types): server = self.fetch_server(server) if server is None: return False else: status = self.get_status(server) else: status = self.get_status(server) if status != 'ACTIVE': self.refresh_server(server) status = self.get_status(server) if status in self.deferred_server_statuses: return False elif status == 'ACTIVE': return True elif status == 'ERROR': fault = getattr(server, 'fault', {}) raise exception.ServerInError( resource_status=status, status_reason=_("Message: %(message)s, Code: %(code)s") % { 'message': fault.get('message', _('Unknown')), 'code': fault.get('code', _('Unknown')) }) else: raise exception.ServerUnknownStatus( resource_status=server.status, status_reason=_('Unknown'), result=_('Server is not active'))
def get_server_id(self, server, raise_on_error=True): if uuidutils.is_uuid_like(server): return server elif isinstance(server, six.string_types): servers = self.client().servers.list(search_opts={'name': server}) if len(servers) == 1: return servers[0].id if raise_on_error: raise exception.ZunException(_( "Unable to get server id with name %s") % server) else: raise exception.ZunException(_("Unexpected server type"))
def resource_setup(cls): super(BaremetalStandaloneScenarioTest, cls).resource_setup() base.set_baremetal_api_microversion(cls.api_microversion) for v in cls.mandatory_attr: if getattr(cls, v) is None: raise lib_exc.InvalidConfiguration( "Mandatory attribute %s not set." % v) image_checksum = None if not uuidutils.is_uuid_like(cls.image_ref): image_checksum = cls.image_checksum cls.node = cls.boot_node(cls.driver, cls.image_ref, image_checksum=image_checksum) cls.node_ip = cls.add_floatingip_to_node(cls.node['uuid'])
def test_is_uuid_like(self): self.assertTrue(uuidutils.is_uuid_like(str(uuid.uuid4()))) self.assertTrue(uuidutils.is_uuid_like( '{12345678-1234-5678-1234-567812345678}')) self.assertTrue(uuidutils.is_uuid_like( '12345678123456781234567812345678')) self.assertTrue(uuidutils.is_uuid_like( 'urn:uuid:12345678-1234-5678-1234-567812345678')) self.assertTrue(uuidutils.is_uuid_like( 'urn:bbbaaaaa-aaaa-aaaa-aabb-bbbbbbbbbbbb')) self.assertTrue(uuidutils.is_uuid_like( 'uuid:bbbaaaaa-aaaa-aaaa-aabb-bbbbbbbbbbbb')) self.assertTrue(uuidutils.is_uuid_like( '{}---bbb---aaa--aaa--aaa-----aaa---aaa--bbb-bbb---bbb-bbb-bb-{}'))
def test_is_uuid_like_insensitive(self): self.assertTrue(uuidutils.is_uuid_like(str(uuid.uuid4()).upper()))
def test_id_is_uuid_like(self): self.assertFalse(uuidutils.is_uuid_like(1234567))
def test_name_is_uuid_like(self): self.assertFalse(uuidutils.is_uuid_like('zhongyueluo'))
def _is_uuid_uri(self, uri): return uuidutils.is_uuid_like( urlparse.urlparse(uri).path.split('/')[-1])
def validate(value): if not uuidutils.is_uuid_like(value): raise exc.InputException( "Expected a uuid but received %s." % value ) return value
def _get_access_param(self, context, protocol, creds): if const.PROTOCOL_SNMP in protocol: if not uuidutils.is_uuid_like(creds): access_parameters = db.get_snmp_cred_by_name_and_protocol( context, creds, protocol) else: access_parameters = db.get_snmp_cred_by_id(context, creds) else: if not uuidutils.is_uuid_like(creds): access_parameters = db.get_netconf_cred_by_name_and_protocol( context, creds, protocol) else: access_parameters = db.get_netconf_cred_by_id(context, creds) if not access_parameters: raise webob.exc.HTTPBadRequest( _("Credentials not found for Id or name: %s") % creds) if isinstance(access_parameters, list) and len(access_parameters) > 1: raise webob.exc.HTTPBadRequest( _("Multiple credentials matches found " "for name: %s, use an ID to be more specific.") % creds) if isinstance(access_parameters, list): access_parameters = access_parameters[0] if access_parameters['protocol_type'] != protocol: raise webob.exc.HTTPBadRequest( _("Credentials not found for Id or name: %s") % creds) return access_parameters
def _get_credentials_dict(self, bnp_switch, func_name): if not bnp_switch: self._raise_ml2_error(wexc.HTTPNotFound, func_name) db_context = neutron_context.get_admin_context() creds_dict = {} creds_dict['ip_address'] = bnp_switch.ip_address prov_creds = bnp_switch.credentials prov_protocol = bnp_switch.management_protocol if hp_const.PROTOCOL_SNMP in prov_protocol: if not uuidutils.is_uuid_like(prov_creds): snmp_cred = db.get_snmp_cred_by_name(db_context, prov_creds) snmp_cred = snmp_cred[0] else: snmp_cred = db.get_snmp_cred_by_id(db_context, prov_creds) if not snmp_cred: LOG.error(_LE("Credentials does not match")) self._raise_ml2_error(wexc.HTTPNotFound, '') creds_dict['write_community'] = snmp_cred.write_community creds_dict['security_name'] = snmp_cred.security_name creds_dict['security_level'] = snmp_cred.security_level creds_dict['auth_protocol'] = snmp_cred.auth_protocol creds_dict['management_protocol'] = prov_protocol creds_dict['auth_key'] = snmp_cred.auth_key creds_dict['priv_protocol'] = snmp_cred.priv_protocol creds_dict['priv_key'] = snmp_cred.priv_key else: if not uuidutils.is_uuid_like(prov_creds): netconf_cred = db.get_netconf_cred_by_name(db_context, prov_creds) else: netconf_cred = db.get_netconf_cred_by_id(db_context, prov_creds) if not netconf_cred: LOG.error(_LE("Credentials does not match")) self._raise_ml2_error(wexc.HTTPNotFound, '') creds_dict['user_name'] = netconf_cred.write_community creds_dict['password'] = netconf_cred.security_name creds_dict['key_path'] = netconf_cred.security_level return creds_dict
def validate_access_parameters(body): """Validate if the request body is in proper format.""" protocol_dict = deepcopy(body) if const.NAME not in protocol_dict.keys(): raise webob.exc.HTTPBadRequest( _("Name not found in request body")) if uuidutils.is_uuid_like(protocol_dict['name']): raise webob.exc.HTTPBadRequest( _("Name=%s should not be in uuid format") % protocol_dict['name']) protocol_dict.pop('name') keys = list(protocol_dict.keys()) if not len(keys): raise webob.exc.HTTPBadRequest( _("Request body should have at least one protocol specified")) elif len(keys) > 1: raise webob.exc.HTTPBadRequest( _("multiple protocols in a single request is not supported")) key = keys[0] if key.lower() not in const.SUPPORTED_PROTOCOLS: raise webob.exc.HTTPBadRequest( _("'protocol %s' is not supported") % keys) if key.lower() == const.SNMP_V3: return validate_snmpv3_parameters(protocol_dict, key) elif key.lower() in [const.NETCONF_SSH, const.NETCONF_SOAP]: return validate_netconf_parameters(protocol_dict, key) else: return validate_snmp_parameters(protocol_dict, key)
def validate_access_parameters_for_update(body): """Validate if the request body is in proper format.""" protocol_dict = deepcopy(body) if (const.NAME not in protocol_dict.keys() and not len(protocol_dict.keys())): raise webob.exc.HTTPBadRequest( _("Request must have name or one protocol type")) if const.NAME in protocol_dict.keys(): if uuidutils.is_uuid_like(protocol_dict['name']): raise webob.exc.HTTPBadRequest( _("Name=%s should not be in uuid format") % protocol_dict['name']) protocol_dict.pop('name') if protocol_dict: keys = list(protocol_dict.keys()) if len(keys) > 1: raise webob.exc.HTTPBadRequest( _("Multiple protocols in a single request is not supported")) key = keys[0] if key.lower() not in const.SUPPORTED_PROTOCOLS: raise webob.exc.HTTPBadRequest( _("Protocol %s is not supported") % keys) if key.lower() == const.SNMP_V3: return validate_snmpv3_parameters_for_update(protocol_dict, key) elif key.lower() in [const.NETCONF_SSH, const.NETCONF_SOAP]: return validate_netconf_parameters_for_update(protocol_dict, key) else: return validate_snmp_parameters_for_update(protocol_dict, key) else: return None
def _validate_uuid_format(uid): return uuidutils.is_uuid_like(uid)
def take_action(self, parsed_args): client = self.app.client_manager.data_protection if not uuidutils.is_uuid_like(parsed_args.trigger_id): raise exceptions.CommandError( "Invalid trigger id provided.") so = client.scheduled_operations.create( parsed_args.name, parsed_args.operation_type, parsed_args.trigger_id, parsed_args.operation_definition) format_scheduledoperation(so._info) return zip(*sorted(six.iteritems(so._info)))
def take_action(self, parsed_args): client = self.app.client_manager.data_protection if not uuidutils.is_uuid_like(parsed_args.provider_id): raise exceptions.CommandError( "Invalid provider id provided.") if not uuidutils.is_uuid_like(parsed_args.checkpoint_id): raise exceptions.CommandError( "Invalid checkpoint id provided.") restore_parameters = utils.extract_parameters(parsed_args) restore_auth = None if parsed_args.restore_target is not None: if parsed_args.restore_username is None: raise exceptions.CommandError( "Must specify username for restore_target.") if parsed_args.restore_password is None: raise exceptions.CommandError( "Must specify password for restore_target.") restore_auth = { 'type': 'password', 'username': parsed_args.restore_username, 'password': parsed_args.restore_password, } restore = client.restores.create(parsed_args.provider_id, parsed_args.checkpoint_id, parsed_args.restore_target, restore_parameters, restore_auth) format_restore(restore._info) return zip(*sorted(restore._info.items()))