我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用libvirt.VIR_CRED_PASSPHRASE。
def __enter__(self): try: if self.sasl_username and self.sasl_password: def request_cred(credentials, user_data): for credential in credentials: if credential[0] == libvirt.VIR_CRED_AUTHNAME: credential[4] = self.sasl_username elif credential[0] == libvirt.VIR_CRED_PASSPHRASE: credential[4] = self.sasl_password return 0 auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE], request_cred, None] flags = libvirt.VIR_CONNECT_RO if self.readonly else 0 self.conn = libvirt.openAuth(self.uri, auth, flags) elif self.readonly: self.conn = libvirt.openReadOnly(self.uri) else: self.conn = libvirt.open(self.uri) return self.conn except libvirt.libvirtError as e: raise exception.LibvirtConnectionOpenError(uri=self.uri, error=e)
def retrieve_credentials(self, credentials, user_data): """ Retrieves the libvirt credentials in a strange format and hand it to the API in order to communicate with the hypervisor. To be honest, I have no idea why this has to be done this way. I have taken this function from the official libvirt documentation. :param credentials: libvirt credentials object :param user_data: some data that will never be used :type user_data: None """ #get credentials for libvirt for credential in credentials: if credential[0] == libvirt.VIR_CRED_AUTHNAME: credential[4] = self.USERNAME if len(credential[4]) == 0: credential[4] = credential[3] elif credential[0] == libvirt.VIR_CRED_PASSPHRASE: credential[4] = self.PASSWORD else: return -1 return 0
def __libvirt_auth_credentials_callback(self, credentials, user_data): for credential in credentials: if credential[0] == libvirt.VIR_CRED_AUTHNAME: credential[4] = self.login if len(credential[4]) == 0: credential[4] = credential[3] elif credential[0] == libvirt.VIR_CRED_PASSPHRASE: credential[4] = self.passwd else: return -1 return 0
def __connect_tcp(self): flags = [libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE] auth = [flags, self.__libvirt_auth_credentials_callback, None] uri = 'qemu+tcp://%s/system' % self.host try: return libvirt.openAuth(uri, auth, 0) except libvirtError as e: self.last_error = 'Connection Failed: ' + str(e) self.connection = None
def __connect_tcp(self): flags = [libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE] auth = [flags, self.__libvirt_auth_credentials_callback, None] uri = 'qemu+tcp://%s/system' % self.host try: self.connection = libvirt.openAuth(uri, auth, 0) self.last_error = None except libvirtError as e: self.last_error = 'Connection Failed: ' + str(e) self.connection = None
def _get_auth_conn(config): def request_cred(credentials, user_data): for credential in credentials: if credential[0] == libvirt.VIR_CRED_AUTHNAME: credential[4] = config.get("username") elif credential[0] == libvirt.VIR_CRED_PASSPHRASE: credential[4] = config.get("password") return 0 auth = [ [libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE], request_cred, None ] return libvirt.openAuth(config["uri"], auth, 0)
def request_cred(self, credentials, user_data): for credential in credentials: if credential[0] == libvirt.VIR_CRED_AUTHNAME: credential[4] = self.host_username elif credential[0] == libvirt.VIR_CRED_PASSPHRASE: credential[4] = self.host_password return 0
def connect(self): """ Returns None in case of failed connection. """ conn = None if self.conn_status is False: return False if self.host_protocol == 'libssh2': self.auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE], self.request_cred, None] if self.host_url == None: conn = libvirt.open("qemu:///system") else: url = "{}://{}/?sshauth=password".format(VIRT_CONN_LIBSSH2, self.host_url) conn = libvirt.openAuth(url, self.auth, 0) elif self.host_protocol == 'ssh': if self.host_url == None: conn = libvirt.open("qemu:///system") else: url = "{}://{}@{}/system?socket=/var/run/libvirt/libvirt-sock&keyfile={}".format(VIRT_CONN_SSH, self.host_username, self.host_url, self.host_key) conn = libvirt.open(url) elif self.host_protocol == 'qemu': conn = libvirt.open("qemu:///system") if conn == None: logging.error('Connection to hypervisor failed!') return False else: logging.info('Connection succesfull.') self.conn = conn SETTINGS['connection'] = self.conn
def __connect(self): """This function establishes a connection to the hypervisor.""" #create weirdo auth dict auth = [ [libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE], self.retrieve_credentials, None ] #authenticate self.SESSION = libvirt.openAuth(self.URI, auth, 0) if self.SESSION == None: raise SessionException("Unable to establish connection to hypervisor!")
def test_open_auth_ignored(self): def req(credentials, user_data): for cred in credentials: if cred[0] == libvirt.VIR_CRED_AUTHNAME: cred[4] = 'convirt' elif cred[0] == libvirt.VIR_CRED_PASSPHRASE: cred[4] = 'ignored' return 0 auth = [[libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE], req, None] conn = convirt.openAuth('convirt:///system', auth) self.assertTrue(conn)
def auth_callback(credentials, user_data): for credential in credentials: if credential[0] == libvirt.VIR_CRED_AUTHNAME: credential[4] = config.get('libvirt_username') elif credential[0] == libvirt.VIR_CRED_PASSPHRASE: credential[4] = config.get('libvirt_password') return 0
def get_libvirt_connection(name, libvirt_url='qemu:///system'): if name not in LIBVIRT_CONNECTIONS: auth = [ [libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE], auth_callback, None ] LIBVIRT_CONNECTIONS[name] = libvirt.openAuth(libvirt_url, auth) return LIBVIRT_CONNECTIONS[name]
def __connect_tcp(self): flags = [libvirt.VIR_CRED_AUTHNAME, libvirt.VIR_CRED_PASSPHRASE] auth = [flags, self.__libvirt_auth_credentials_callback, None] uri = 'qemu+tcp://%s/system' % self.host try: self.connection = libvirt.openAuth(uri, auth, 0) self.last_error = None except libvirtError as e: self.last_error = 'Connection Failed: ' + str(e) log_error(self.last_error) self.connection = None