Python paramiko 模块,RSAKey() 实例源码

我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用paramiko.RSAKey()

项目:jumpserver-python-sdk    作者:jumpserver    | 项目源码 | 文件源码
def from_string(cls, key_string):
        try:
            pkey = paramiko.RSAKey(file_obj=StringIO(key_string))
            return pkey
        except paramiko.SSHException:
            try:
                pkey = paramiko.DSSKey(file_obj=StringIO(key_string))
                return pkey
            except paramiko.SSHException:
                return None
项目:coco    作者:jumpserver    | 项目源码 | 文件源码
def ssh_key_gen(length=2048, type='rsa', password=None, username='jumpserver', hostname=None):
    """Generate user ssh private and public key

    Use paramiko RSAKey generate it.
    :return private key str and public key str
    """

    if hostname is None:
        hostname = os.uname()[1]

    f = StringIO()

    try:
        if type == 'rsa':
            private_key_obj = paramiko.RSAKey.generate(length)
        elif type == 'dsa':
            private_key_obj = paramiko.DSSKey.generate(length)
        else:
            raise IOError('SSH private key must be `rsa` or `dsa`')
        private_key_obj.write_private_key(f, password=password)
        private_key = f.getvalue()
        public_key = ssh_pubkey_gen(private_key_obj, username=username, hostname=hostname)
        return private_key, public_key
    except IOError:
        raise IOError('These is error when generate ssh key.')
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def test_4_auto_add_policy(self):
        """
        verify that SSHClient's AutoAddPolicy works.
        """
        threading.Thread(target=self._run).start()
        host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
        public_host_key = paramiko.RSAKey(data=host_key.asbytes())

        self.tc = paramiko.SSHClient()
        self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.assertEqual(0, len(self.tc.get_host_keys()))
        self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')

        self.event.wait(1.0)
        self.assertTrue(self.event.is_set())
        self.assertTrue(self.ts.is_active())
        self.assertEqual('slowdive', self.ts.get_username())
        self.assertEqual(True, self.ts.is_authenticated())
        self.assertEqual(1, len(self.tc.get_host_keys()))
        self.assertEqual(public_host_key, self.tc.get_host_keys()['[%s]:%d' % (self.addr, self.port)]['ssh-rsa'])
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def test_7_banner_timeout(self):
        """
        verify that the SSHClient has a configurable banner timeout.
        """
        # Start the thread with a 1 second wait.
        threading.Thread(target=self._run, kwargs={'delay': 1}).start()
        host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
        public_host_key = paramiko.RSAKey(data=host_key.asbytes())

        self.tc = paramiko.SSHClient()
        self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)
        # Connect with a half second banner timeout.
        self.assertRaises(
            paramiko.SSHException,
            self.tc.connect,
            self.addr,
            self.port,
            username='slowdive',
            password='pygmalion',
            banner_timeout=0.5
        )
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def test_4_dict_set(self):
        hostdict = paramiko.HostKeys('hostfile.temp')
        key = paramiko.RSAKey(data=decodebytes(keyblob))
        key_dss = paramiko.DSSKey(data=decodebytes(keyblob_dss))
        hostdict['secure.example.com'] = {
            'ssh-rsa': key,
            'ssh-dss': key_dss
        }
        hostdict['fake.example.com'] = {}
        hostdict['fake.example.com']['ssh-rsa'] = key

        self.assertEqual(3, len(hostdict))
        self.assertEqual(2, len(list(hostdict.values())[0]))
        self.assertEqual(1, len(list(hostdict.values())[1]))
        self.assertEqual(1, len(list(hostdict.values())[2]))
        fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper()
        self.assertEqual(b'7EC91BB336CB6D810B124B1353C32396', fp)
        fp = hexlify(hostdict['secure.example.com']['ssh-dss'].get_fingerprint()).upper()
        self.assertEqual(b'4478F0B9A23CC5182009FF755BC1D26C', fp)
项目:shakedown    作者:dcos    | 项目源码 | 文件源码
def validate_key(key_path):
    """ Validate a key

        :param key_path: path to a key to use for authentication
        :type key_path: str

        :return: key object used for authentication
        :rtype: paramiko.RSAKey
    """

    key_path = os.path.expanduser(key_path)

    if not os.path.isfile(key_path):
        return False

    return paramiko.RSAKey.from_private_key_file(key_path)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def generate_key(bits):
    """Generate a paramiko RSAKey"""
    # NOTE(dims): pycryptodome has changed the signature of the RSA.generate
    # call. specifically progress_func has been dropped. paramiko still uses
    # pycrypto. However some projects like latest pysaml2 have switched from
    # pycrypto to pycryptodome as pycrypto seems to have been abandoned.
    # paramiko project has started transition to pycryptodome as well but
    # there is no release yet with that support. So at the moment depending on
    # which version of pysaml2 is installed, Nova is likely to break. So we
    # call "RSA.generate(bits)" which works on both pycrypto and pycryptodome
    # and then wrap it into a paramiko.RSAKey
    rsa = RSA.generate(bits)
    key = paramiko.RSAKey(vals=(rsa.e, rsa.n))
    key.d = rsa.d
    key.p = rsa.p
    key.q = rsa.q
    return key
项目:netconf-proxy    作者:fortinet-solutions-cse    | 项目源码 | 文件源码
def setup_travis ():
    import getpass
    import sys
    global private_key                                      # pylint: disable=W0603

    logging.basicConfig(level=logging.DEBUG, stream=sys.stderr)
    print("Setup called.")
    if 'USER' in os.environ:
        if os.environ['USER'] != "travis":
            return
    else:
        if getpass.getuser() != "travis":
            return

    print("Executing under Travis-CI")
    ssh_dir = "{}/.ssh".format(os.environ['HOME'])
    priv_filename = os.path.join(ssh_dir, "id_rsa")
    if os.path.exists(priv_filename):
        logger.error("Found private keyfile")
        print("Found private keyfile")
        return
    else:
        logger.error("Creating ssh dir " + ssh_dir)
        print("Creating ssh dir " + ssh_dir)
        os.system("mkdir -p {}".format(ssh_dir))
        priv = ssh.RSAKey.generate(bits=1024)
        private_key = priv

        logger.error("Generating private keyfile " + priv_filename)
        print("Generating private keyfile " + priv_filename)
        priv.write_private_key_file(filename=priv_filename)

        pub = ssh.RSAKey(filename=priv_filename)
        auth_filename = os.path.join(ssh_dir, "authorized_keys")
        logger.error("Adding keys to authorized_keys file " + auth_filename)
        print("Adding keys to authorized_keys file " + auth_filename)
        with open(auth_filename, "a") as authfile:
            authfile.write("{} {}\n".format(pub.get_name(), pub.get_base64()))
        logger.error("Done generating keys")
        print("Done generating keys")
项目:coco    作者:jumpserver    | 项目源码 | 文件源码
def ssh_key_string_to_obj(text):
    key_f = StringIO(text)
    key = None
    try:
        key = paramiko.RSAKey.from_private_key(key_f)
    except paramiko.SSHException:
        pass

    try:
        key = paramiko.DSSKey.from_private_key(key_f)
    except paramiko.SSHException:
        pass
    return key
项目:coco    作者:jumpserver    | 项目源码 | 文件源码
def get_host_key(cls):
        logger.debug("Get ssh server host key")
        if not os.path.isfile(cls.host_key_path):
            cls.host_key_gen()
        return paramiko.RSAKey(filename=cls.host_key_path)
项目:mock-ssh-server    作者:carletes    | 项目源码 | 文件源码
def __init__(self, server, client_conn):
        self.server = server
        self.thread = None
        self.command_queues = {}
        client, _ = client_conn
        self.transport = t = paramiko.Transport(client)
        t.add_server_key(paramiko.RSAKey(filename=SERVER_KEY_PATH))
        t.set_subsystem_handler("sftp", sftp.SFTPServer)
项目:mock-ssh-server    作者:carletes    | 项目源码 | 文件源码
def add_user(self, uid, private_key_path):
        k = paramiko.RSAKey.from_private_key_file(private_key_path)
        self._users[uid] = (private_key_path, k)
项目:mock-ssh-server    作者:carletes    | 项目源码 | 文件源码
def client(self, uid):
        private_key_path, _ = self._users[uid]
        c = paramiko.SSHClient()
        host_keys = c.get_host_keys()
        key = paramiko.RSAKey.from_private_key_file(SERVER_KEY_PATH)
        host_keys.add(self.host, "ssh-rsa", key)
        host_keys.add("[%s]:%d" % (self.host, self.port), "ssh-rsa", key)
        c.set_missing_host_key_policy(paramiko.RejectPolicy())
        c.connect(hostname=self.host,
                  port=self.port,
                  username=uid,
                  key_filename=private_key_path,
                  allow_agent=False,
                  look_for_keys=False)
        return c
项目:tcp-qa    作者:Mirantis    | 项目源码 | 文件源码
def generate_keys():
    file_obj = StringIO.StringIO()
    key = paramiko.RSAKey.generate(1024)
    key.write_private_key(file_obj)
    public = key.get_base64()
    private = file_obj.getvalue()
    file_obj.close()
    return {'private': private,
            'public': public}
项目:tcp-qa    作者:Mirantis    | 项目源码 | 文件源码
def load_keyfile(file_path):
    with open(file_path, 'r') as private_key_file:
        private = private_key_file.read()
    key = paramiko.RSAKey(file_obj=StringIO.StringIO(private))
    public = key.get_base64()
    return {'private': private,
            'public': public}
项目:tcp-qa    作者:Mirantis    | 项目源码 | 文件源码
def dump_keyfile(file_path, key):
    key = paramiko.RSAKey(file_obj=StringIO.StringIO(key['private']))
    key.write_private_key_file(file_path)
    os.chmod(file_path, 0o644)
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def _run(self, allowed_keys=None, delay=0):
        if allowed_keys is None:
            allowed_keys = FINGERPRINTS.keys()
        self.socks, addr = self.sockl.accept()
        self.ts = paramiko.Transport(self.socks)
        host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
        self.ts.add_server_key(host_key)
        server = NullServer(allowed_keys=allowed_keys)
        if delay:
            time.sleep(delay)
        self.ts.start_server(self.event, server)
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def test_5_save_host_keys(self):
        """
        verify that SSHClient correctly saves a known_hosts file.
        """
        warnings.filterwarnings('ignore', 'tempnam.*')

        host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
        public_host_key = paramiko.RSAKey(data=host_key.asbytes())
        fd, localname = mkstemp()
        os.close(fd)

        client = paramiko.SSHClient()
        self.assertEquals(0, len(client.get_host_keys()))

        host_id = '[%s]:%d' % (self.addr, self.port)

        client.get_host_keys().add(host_id, 'ssh-rsa', public_host_key)
        self.assertEquals(1, len(client.get_host_keys()))
        self.assertEquals(public_host_key, client.get_host_keys()[host_id]['ssh-rsa'])

        client.save_host_keys(localname)

        with open(localname) as fd:
            assert host_id in fd.read()

        os.unlink(localname)
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def test_6_cleanup(self):
        """
        verify that when an SSHClient is collected, its transport (and the
        transport's packetizer) is closed.
        """
        # Unclear why this is borked on Py3, but it is, and does not seem worth
        # pursuing at the moment.
        # XXX: It's the release of the references to e.g packetizer that fails
        # in py3...
        if not PY2:
            return
        threading.Thread(target=self._run).start()
        host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
        public_host_key = paramiko.RSAKey(data=host_key.asbytes())

        self.tc = paramiko.SSHClient()
        self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.assertEqual(0, len(self.tc.get_host_keys()))
        self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion')

        self.event.wait(1.0)
        self.assertTrue(self.event.is_set())
        self.assertTrue(self.ts.is_active())

        p = weakref.ref(self.tc._transport.packetizer)
        self.assertTrue(p() is not None)
        self.tc.close()
        del self.tc

        # hrm, sometimes p isn't cleared right away.  why is that?
        #st = time.time()
        #while (time.time() - st < 5.0) and (p() is not None):
        #    time.sleep(0.1)

        # instead of dumbly waiting for the GC to collect, force a collection
        # to see whether the SSHClient object is deallocated correctly
        import gc
        gc.collect()

        self.assertTrue(p() is None)
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def _run(self):
        self.socks, addr = self.sockl.accept()
        self.ts = paramiko.Transport(self.socks)
        host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
        self.ts.add_server_key(host_key)
        server = NullServer()
        self.ts.start_server(self.event, server)
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def test_1_gss_auth(self):
        """
        Verify that Paramiko can handle SSHv2 GSS-API / SSPI authentication
        (gssapi-with-mic) in client and server mode.
        """
        host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
        public_host_key = paramiko.RSAKey(data=host_key.asbytes())

        self.tc = paramiko.SSHClient()
        self.tc.get_host_keys().add('[%s]:%d' % (self.hostname, self.port),
                                    'ssh-rsa', public_host_key)
        self.tc.connect(self.hostname, self.port, username=self.username,
                        gss_auth=True)

        self.event.wait(1.0)
        self.assert_(self.event.is_set())
        self.assert_(self.ts.is_active())
        self.assertEquals(self.username, self.ts.get_username())
        self.assertEquals(True, self.ts.is_authenticated())

        stdin, stdout, stderr = self.tc.exec_command('yes')
        schan = self.ts.accept(1.0)

        schan.send('Hello there.\n')
        schan.send_stderr('This is on stderr.\n')
        schan.close()

        self.assertEquals('Hello there.\n', stdout.readline())
        self.assertEquals('', stdout.readline())
        self.assertEquals('This is on stderr.\n', stderr.readline())
        self.assertEquals('', stderr.readline())

        stdin.close()
        stdout.close()
        stderr.close()
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def test_2_add(self):
        hostdict = paramiko.HostKeys('hostfile.temp')
        hh = '|1|BMsIC6cUIP2zBuXR3t2LRcJYjzM=|hpkJMysjTk/+zzUUzxQEa2ieq6c='
        key = paramiko.RSAKey(data=decodebytes(keyblob))
        hostdict.add(hh, 'ssh-rsa', key)
        self.assertEqual(3, len(list(hostdict)))
        x = hostdict['foo.example.com']
        fp = hexlify(x['ssh-rsa'].get_fingerprint()).upper()
        self.assertEqual(b'7EC91BB336CB6D810B124B1353C32396', fp)
        self.assertTrue(hostdict.check('foo.example.com', key))
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def test_1_gsskex_and_auth(self):
        """
        Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated
        Diffie-Hellman Key Exchange and user authentication with the GSS-API
        context created during key exchange.
        """
        host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key')
        public_host_key = paramiko.RSAKey(data=host_key.asbytes())

        self.tc = paramiko.SSHClient()
        self.tc.get_host_keys().add('[%s]:%d' % (self.hostname, self.port),
                                    'ssh-rsa', public_host_key)
        self.tc.connect(self.hostname, self.port, username=self.username,
                        gss_auth=True, gss_kex=True)

        self.event.wait(1.0)
        self.assert_(self.event.is_set())
        self.assert_(self.ts.is_active())
        self.assertEquals(self.username, self.ts.get_username())
        self.assertEquals(True, self.ts.is_authenticated())

        stdin, stdout, stderr = self.tc.exec_command('yes')
        schan = self.ts.accept(1.0)

        schan.send('Hello there.\n')
        schan.send_stderr('This is on stderr.\n')
        schan.close()

        self.assertEquals('Hello there.\n', stdout.readline())
        self.assertEquals('', stdout.readline())
        self.assertEquals('This is on stderr.\n', stderr.readline())
        self.assertEquals('', stderr.readline())

        stdin.close()
        stdout.close()
        stderr.close()
项目:wetland    作者:ohmyadd    | 项目源码 | 文件源码
def handle(self):
        transport = paramiko.Transport(self.request)

        rsafile = self.server.cfg.get("ssh", "private_rsa")
        dsafile = self.server.cfg.get("ssh", "private_dsa")
        rsakey = paramiko.RSAKey(filename=rsafile)
        dsakey = paramiko.DSSKey(filename=dsafile)
        transport.add_server_key(rsakey)
        transport.add_server_key(dsakey)

        transport.local_version = self.server.cfg.get("ssh", "banner")

        transport.set_subsystem_handler('sftp', paramiko.SFTPServer,
                                        sftpServer.sftp_server)
        nw = network.network(self.client_address[0],
                    self.server.cfg.get("wetland", "docker_addr"))
        nw.create()

        sServer = sshServer.ssh_server(transport=transport, network=nw)

        try:
            transport.start_server(server=sServer)
        except paramiko.SSHException:
            return
        except Exception as e:
            print e
            nw.delete()
            sServer.docker_trans.close()
            return

        try:
            while True:
                chann = transport.accept(60)
                # no channel left
                if not transport._channels.values():
                    break
        except Exception as e:
            print e
        finally:
            nw.delete()
            sServer.docker_trans.close()
项目:shakedown    作者:dcos    | 项目源码 | 文件源码
def get_transport(host, username, key):
    """ Create a transport object

        :param host: the hostname to connect to
        :type host: str
        :param username: SSH username
        :type username: str
        :param key: key object used for authentication
        :type key: paramiko.RSAKey

        :return: a transport object
        :rtype: paramiko.Transport
    """

    if host == shakedown.master_ip():
        transport = paramiko.Transport(host)
    else:
        transport_master = paramiko.Transport(shakedown.master_ip())
        transport_master = start_transport(transport_master, username, key)

        if not transport_master.is_authenticated():
            print("error: unable to authenticate {}@{} with key {}".format(username, shakedown.master_ip(), key_path))
            return False

        try:
            channel = transport_master.open_channel('direct-tcpip', (host, 22), ('127.0.0.1', 0))
        except paramiko.SSHException:
            print("error: unable to connect to {}".format(host))
            return False

        transport = paramiko.Transport(channel)

    return transport
项目:shakedown    作者:dcos    | 项目源码 | 文件源码
def start_transport(transport, username, key):
    """ Begin a transport client and authenticate it

        :param transport: the transport object to start
        :type transport: paramiko.Transport
        :param username: SSH username
        :type username: str
        :param key: key object used for authentication
        :type key: paramiko.RSAKey

        :return: the transport object passed
        :rtype: paramiko.Transport
    """

    transport.start_client()

    agent = paramiko.agent.Agent()
    keys = itertools.chain((key,) if key else (), agent.get_keys())
    for test_key in keys:
        try:
            transport.auth_publickey(username, test_key)
            break
        except paramiko.AuthenticationException as e:
            pass
    else:
        raise ValueError('No valid key supplied')

    return transport


# SSH connection will be auto-terminated at the conclusion of this operation, causing
# a race condition; the try/except block attempts to close the channel and/or transport
# but does not issue a failure if it has already been closed.
项目:BigBrotherBot-For-UrT43    作者:ptitbigorneau    | 项目源码 | 文件源码
def get_private_key(self, password=None):
        if self.private_key_file is not None:
            with open(self.private_key_file, "r") as f:
                key_head = f.readline()
            if 'DSA' in key_head:
                key_type = paramiko.DSSKey
            elif 'RSA' in key_head:
                key_type = paramiko.RSAKey
            else:
                raise ValueError("can't identify private key type")
            with open(self.private_key_file, "r") as f:
                return key_type.from_private_key(f, password=password)
项目:niceman    作者:ReproNim    | 项目源码 | 文件源码
def load_remote_rsa_key(self, remote_filename):
        """
        Returns paramiko.RSAKey object for an RSA key located on the remote
        machine
        """
        rfile = self.remote_file(remote_filename, 'r')
        key = get_rsa_key(key_file_obj=rfile)
        rfile.close()
        return key
项目:niceman    作者:ReproNim    | 项目源码 | 文件源码
def get_rsa_key(key_location=None, key_file_obj=None, passphrase=None,
                use_pycrypto=False):
    key_fobj = key_file_obj or open(key_location)
    try:
        if use_pycrypto:
            key = RSA.importKey(key_fobj, passphrase=passphrase)
        else:
            key = paramiko.RSAKey.from_private_key(key_fobj,
                                                   password=passphrase)
        return key
    except (paramiko.SSHException, ValueError):
        raise exception.SSHError(
            "Invalid RSA private key file or missing passphrase: %s" %
            key_location)
项目:niceman    作者:ReproNim    | 项目源码 | 文件源码
def generate_rsa_key():
    return paramiko.RSAKey.generate(2048)
项目:SameKeyProxy    作者:xzhou    | 项目源码 | 文件源码
def configure_client_socket(self):

        self.log.info("SSHProtocol: ccs: configuring client socket")

        host_key = paramiko.RSAKey(filename='test_rsa.key')

        self.sourcet = paramiko.Transport(self.source)
        try:
            self.sourcet.load_server_moduli()
        except:
            self.log.error("SSHProtocol: Failed to load moduli -- gex " \
                           "will be unsupported.)")
            raise
        self.sourcet.add_server_key(host_key)



        # Initialize the SSH connection to the server
        # BUGFIX: client must init before the server start. A race condition
        # was created between the server and the client. If the client can
        # get credentials to SSHServer before client socket config 
        # this method there will be an error.
        self.log.info("SSHProtocol: initializing client to server SSH")
        self.init_ssh_client()

        # Start up the SSH server.                     
        server = SSHServer(self)                
        try:
            self.sourcet.start_server(server=server)
        except paramiko.SSHException, x:
            self.log.error("SSHProtocol: SSH negotiation failed.")
            return

        # wait for auth
        chan = self.sourcet.accept(20)
        if chan is None:
            self.log.error("*** No channel.")
            sys.exit(1)
        self.log.info("SSHProtocol: User authenticated!")


        server.event.wait(10)
        if not server.event.isSet():
            self.log.warn("SSHProtocol: client never asked for a anything.")
            return

        chan.send('\r\n\r\nThings just got real.\r\n\r\n')

        self.sourceschan = chan
        self.log.debug("SSHProtocol: finished configuring client socket")

        self.destschan = self.destt.open_session()
        self.destschan.get_pty()
        self.destschan.invoke_shell()

#    chan = t.open_session()
#    chan.get_pty()
#    chan.invoke_shell()

        #chan.close()
        #self.sourcet.close()
项目:netconf-proxy    作者:fortinet-solutions-cse    | 项目源码 | 文件源码
def get_user_auth_keys (self, username):
        """Parse the users's authorized_keys file if any to look for authorized keys"""
        if username in self.users_keys:
            return self.users_keys[username]

        self.users_keys[username] = []

        userdir = os.path.expanduser("~" + username)
        if not userdir:
            return self.users_keys[username]

        keyfile = os.path.join(userdir, ".ssh/authorized_keys")
        if not keyfile or not os.path.exists(keyfile):
            return self.users_keys[username]

        with open(keyfile) as f:
            for line in f.readlines():
                line = line.strip()
                if not line or line.startswith("#"):
                    continue
                values = [ x.strip() for x in line.split() ]

                exp = None
                try:
                    int(values[0])      # bits value?
                except ValueError:
                    # Type 1 or type 2, type 1 is bits in second value
                    options_ktype = values[0]
                    try:
                        int(values[1])  # bits value?
                    except ValueError:
                        # type 2 with options
                        ktype = options_ktype
                        data = values[1]
                    else:
                        # Type 1 no options.
                        exp = int(values[1])
                        data = values[2]
                else:
                    # Type 1 no options.
                    exp = int(values[1])
                    data = values[2]

                # XXX For now skip type 1 keys
                if exp is not None:
                    continue

                if data:
                    import base64
                    if ktype == "ssh-rsa":
                        key = ssh.RSAKey(data=base64.decodebytes(data.encode('ascii')))
                    elif ktype == "ssh-dss":
                        key = ssh.DSSKey(data=base64.decodebytes(data.encode('ascii')))
                    else:
                        key = None
                    if key:
                        self.users_keys[username].append(key)
        return self.users_keys[username]
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def _test_connection(self, **kwargs):
        """
        (Most) kwargs get passed directly into SSHClient.connect().

        The exception is ``allowed_keys`` which is stripped and handed to the
        ``NullServer`` used for testing.
        """
        run_kwargs = {'allowed_keys': kwargs.pop('allowed_keys', None)}
        # Server setup
        threading.Thread(target=self._run, kwargs=run_kwargs).start()
        host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key'))
        public_host_key = paramiko.RSAKey(data=host_key.asbytes())

        # Client setup
        self.tc = paramiko.SSHClient()
        self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key)

        # Actual connection
        self.tc.connect(self.addr, self.port, username='slowdive', **kwargs)

        # Authentication successful?
        self.event.wait(1.0)
        self.assertTrue(self.event.is_set())
        self.assertTrue(self.ts.is_active())
        self.assertEqual('slowdive', self.ts.get_username())
        self.assertEqual(True, self.ts.is_authenticated())

        # Command execution functions?
        stdin, stdout, stderr = self.tc.exec_command('yes')
        schan = self.ts.accept(1.0)

        schan.send('Hello there.\n')
        schan.send_stderr('This is on stderr.\n')
        schan.close()

        self.assertEqual('Hello there.\n', stdout.readline())
        self.assertEqual('', stdout.readline())
        self.assertEqual('This is on stderr.\n', stderr.readline())
        self.assertEqual('', stderr.readline())

        # Cleanup
        stdin.close()
        stdout.close()
        stderr.close()