我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用paramiko.RSAKey()。
def from_string(cls, key_string): try: pkey = paramiko.RSAKey(file_obj=StringIO(key_string)) return pkey except paramiko.SSHException: try: pkey = paramiko.DSSKey(file_obj=StringIO(key_string)) return pkey except paramiko.SSHException: return None
def ssh_key_gen(length=2048, type='rsa', password=None, username='jumpserver', hostname=None): """Generate user ssh private and public key Use paramiko RSAKey generate it. :return private key str and public key str """ if hostname is None: hostname = os.uname()[1] f = StringIO() try: if type == 'rsa': private_key_obj = paramiko.RSAKey.generate(length) elif type == 'dsa': private_key_obj = paramiko.DSSKey.generate(length) else: raise IOError('SSH private key must be `rsa` or `dsa`') private_key_obj.write_private_key(f, password=password) private_key = f.getvalue() public_key = ssh_pubkey_gen(private_key_obj, username=username, hostname=hostname) return private_key, public_key except IOError: raise IOError('These is error when generate ssh key.')
def test_4_auto_add_policy(self): """ verify that SSHClient's AutoAddPolicy works. """ threading.Thread(target=self._run).start() host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.assertEqual(0, len(self.tc.get_host_keys())) self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion') self.event.wait(1.0) self.assertTrue(self.event.is_set()) self.assertTrue(self.ts.is_active()) self.assertEqual('slowdive', self.ts.get_username()) self.assertEqual(True, self.ts.is_authenticated()) self.assertEqual(1, len(self.tc.get_host_keys())) self.assertEqual(public_host_key, self.tc.get_host_keys()['[%s]:%d' % (self.addr, self.port)]['ssh-rsa'])
def test_7_banner_timeout(self): """ verify that the SSHClient has a configurable banner timeout. """ # Start the thread with a 1 second wait. threading.Thread(target=self._run, kwargs={'delay': 1}).start() host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key) # Connect with a half second banner timeout. self.assertRaises( paramiko.SSHException, self.tc.connect, self.addr, self.port, username='slowdive', password='pygmalion', banner_timeout=0.5 )
def test_4_dict_set(self): hostdict = paramiko.HostKeys('hostfile.temp') key = paramiko.RSAKey(data=decodebytes(keyblob)) key_dss = paramiko.DSSKey(data=decodebytes(keyblob_dss)) hostdict['secure.example.com'] = { 'ssh-rsa': key, 'ssh-dss': key_dss } hostdict['fake.example.com'] = {} hostdict['fake.example.com']['ssh-rsa'] = key self.assertEqual(3, len(hostdict)) self.assertEqual(2, len(list(hostdict.values())[0])) self.assertEqual(1, len(list(hostdict.values())[1])) self.assertEqual(1, len(list(hostdict.values())[2])) fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper() self.assertEqual(b'7EC91BB336CB6D810B124B1353C32396', fp) fp = hexlify(hostdict['secure.example.com']['ssh-dss'].get_fingerprint()).upper() self.assertEqual(b'4478F0B9A23CC5182009FF755BC1D26C', fp)
def validate_key(key_path): """ Validate a key :param key_path: path to a key to use for authentication :type key_path: str :return: key object used for authentication :rtype: paramiko.RSAKey """ key_path = os.path.expanduser(key_path) if not os.path.isfile(key_path): return False return paramiko.RSAKey.from_private_key_file(key_path)
def generate_key(bits): """Generate a paramiko RSAKey""" # NOTE(dims): pycryptodome has changed the signature of the RSA.generate # call. specifically progress_func has been dropped. paramiko still uses # pycrypto. However some projects like latest pysaml2 have switched from # pycrypto to pycryptodome as pycrypto seems to have been abandoned. # paramiko project has started transition to pycryptodome as well but # there is no release yet with that support. So at the moment depending on # which version of pysaml2 is installed, Nova is likely to break. So we # call "RSA.generate(bits)" which works on both pycrypto and pycryptodome # and then wrap it into a paramiko.RSAKey rsa = RSA.generate(bits) key = paramiko.RSAKey(vals=(rsa.e, rsa.n)) key.d = rsa.d key.p = rsa.p key.q = rsa.q return key
def setup_travis (): import getpass import sys global private_key # pylint: disable=W0603 logging.basicConfig(level=logging.DEBUG, stream=sys.stderr) print("Setup called.") if 'USER' in os.environ: if os.environ['USER'] != "travis": return else: if getpass.getuser() != "travis": return print("Executing under Travis-CI") ssh_dir = "{}/.ssh".format(os.environ['HOME']) priv_filename = os.path.join(ssh_dir, "id_rsa") if os.path.exists(priv_filename): logger.error("Found private keyfile") print("Found private keyfile") return else: logger.error("Creating ssh dir " + ssh_dir) print("Creating ssh dir " + ssh_dir) os.system("mkdir -p {}".format(ssh_dir)) priv = ssh.RSAKey.generate(bits=1024) private_key = priv logger.error("Generating private keyfile " + priv_filename) print("Generating private keyfile " + priv_filename) priv.write_private_key_file(filename=priv_filename) pub = ssh.RSAKey(filename=priv_filename) auth_filename = os.path.join(ssh_dir, "authorized_keys") logger.error("Adding keys to authorized_keys file " + auth_filename) print("Adding keys to authorized_keys file " + auth_filename) with open(auth_filename, "a") as authfile: authfile.write("{} {}\n".format(pub.get_name(), pub.get_base64())) logger.error("Done generating keys") print("Done generating keys")
def ssh_key_string_to_obj(text): key_f = StringIO(text) key = None try: key = paramiko.RSAKey.from_private_key(key_f) except paramiko.SSHException: pass try: key = paramiko.DSSKey.from_private_key(key_f) except paramiko.SSHException: pass return key
def get_host_key(cls): logger.debug("Get ssh server host key") if not os.path.isfile(cls.host_key_path): cls.host_key_gen() return paramiko.RSAKey(filename=cls.host_key_path)
def __init__(self, server, client_conn): self.server = server self.thread = None self.command_queues = {} client, _ = client_conn self.transport = t = paramiko.Transport(client) t.add_server_key(paramiko.RSAKey(filename=SERVER_KEY_PATH)) t.set_subsystem_handler("sftp", sftp.SFTPServer)
def add_user(self, uid, private_key_path): k = paramiko.RSAKey.from_private_key_file(private_key_path) self._users[uid] = (private_key_path, k)
def client(self, uid): private_key_path, _ = self._users[uid] c = paramiko.SSHClient() host_keys = c.get_host_keys() key = paramiko.RSAKey.from_private_key_file(SERVER_KEY_PATH) host_keys.add(self.host, "ssh-rsa", key) host_keys.add("[%s]:%d" % (self.host, self.port), "ssh-rsa", key) c.set_missing_host_key_policy(paramiko.RejectPolicy()) c.connect(hostname=self.host, port=self.port, username=uid, key_filename=private_key_path, allow_agent=False, look_for_keys=False) return c
def generate_keys(): file_obj = StringIO.StringIO() key = paramiko.RSAKey.generate(1024) key.write_private_key(file_obj) public = key.get_base64() private = file_obj.getvalue() file_obj.close() return {'private': private, 'public': public}
def load_keyfile(file_path): with open(file_path, 'r') as private_key_file: private = private_key_file.read() key = paramiko.RSAKey(file_obj=StringIO.StringIO(private)) public = key.get_base64() return {'private': private, 'public': public}
def dump_keyfile(file_path, key): key = paramiko.RSAKey(file_obj=StringIO.StringIO(key['private'])) key.write_private_key_file(file_path) os.chmod(file_path, 0o644)
def _run(self, allowed_keys=None, delay=0): if allowed_keys is None: allowed_keys = FINGERPRINTS.keys() self.socks, addr = self.sockl.accept() self.ts = paramiko.Transport(self.socks) host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) self.ts.add_server_key(host_key) server = NullServer(allowed_keys=allowed_keys) if delay: time.sleep(delay) self.ts.start_server(self.event, server)
def test_5_save_host_keys(self): """ verify that SSHClient correctly saves a known_hosts file. """ warnings.filterwarnings('ignore', 'tempnam.*') host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) fd, localname = mkstemp() os.close(fd) client = paramiko.SSHClient() self.assertEquals(0, len(client.get_host_keys())) host_id = '[%s]:%d' % (self.addr, self.port) client.get_host_keys().add(host_id, 'ssh-rsa', public_host_key) self.assertEquals(1, len(client.get_host_keys())) self.assertEquals(public_host_key, client.get_host_keys()[host_id]['ssh-rsa']) client.save_host_keys(localname) with open(localname) as fd: assert host_id in fd.read() os.unlink(localname)
def test_6_cleanup(self): """ verify that when an SSHClient is collected, its transport (and the transport's packetizer) is closed. """ # Unclear why this is borked on Py3, but it is, and does not seem worth # pursuing at the moment. # XXX: It's the release of the references to e.g packetizer that fails # in py3... if not PY2: return threading.Thread(target=self._run).start() host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() self.tc.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.assertEqual(0, len(self.tc.get_host_keys())) self.tc.connect(self.addr, self.port, username='slowdive', password='pygmalion') self.event.wait(1.0) self.assertTrue(self.event.is_set()) self.assertTrue(self.ts.is_active()) p = weakref.ref(self.tc._transport.packetizer) self.assertTrue(p() is not None) self.tc.close() del self.tc # hrm, sometimes p isn't cleared right away. why is that? #st = time.time() #while (time.time() - st < 5.0) and (p() is not None): # time.sleep(0.1) # instead of dumbly waiting for the GC to collect, force a collection # to see whether the SSHClient object is deallocated correctly import gc gc.collect() self.assertTrue(p() is None)
def _run(self): self.socks, addr = self.sockl.accept() self.ts = paramiko.Transport(self.socks) host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') self.ts.add_server_key(host_key) server = NullServer() self.ts.start_server(self.event, server)
def test_1_gss_auth(self): """ Verify that Paramiko can handle SSHv2 GSS-API / SSPI authentication (gssapi-with-mic) in client and server mode. """ host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() self.tc.get_host_keys().add('[%s]:%d' % (self.hostname, self.port), 'ssh-rsa', public_host_key) self.tc.connect(self.hostname, self.port, username=self.username, gss_auth=True) self.event.wait(1.0) self.assert_(self.event.is_set()) self.assert_(self.ts.is_active()) self.assertEquals(self.username, self.ts.get_username()) self.assertEquals(True, self.ts.is_authenticated()) stdin, stdout, stderr = self.tc.exec_command('yes') schan = self.ts.accept(1.0) schan.send('Hello there.\n') schan.send_stderr('This is on stderr.\n') schan.close() self.assertEquals('Hello there.\n', stdout.readline()) self.assertEquals('', stdout.readline()) self.assertEquals('This is on stderr.\n', stderr.readline()) self.assertEquals('', stderr.readline()) stdin.close() stdout.close() stderr.close()
def test_2_add(self): hostdict = paramiko.HostKeys('hostfile.temp') hh = '|1|BMsIC6cUIP2zBuXR3t2LRcJYjzM=|hpkJMysjTk/+zzUUzxQEa2ieq6c=' key = paramiko.RSAKey(data=decodebytes(keyblob)) hostdict.add(hh, 'ssh-rsa', key) self.assertEqual(3, len(list(hostdict))) x = hostdict['foo.example.com'] fp = hexlify(x['ssh-rsa'].get_fingerprint()).upper() self.assertEqual(b'7EC91BB336CB6D810B124B1353C32396', fp) self.assertTrue(hostdict.check('foo.example.com', key))
def test_1_gsskex_and_auth(self): """ Verify that Paramiko can handle SSHv2 GSS-API / SSPI authenticated Diffie-Hellman Key Exchange and user authentication with the GSS-API context created during key exchange. """ host_key = paramiko.RSAKey.from_private_key_file('tests/test_rsa.key') public_host_key = paramiko.RSAKey(data=host_key.asbytes()) self.tc = paramiko.SSHClient() self.tc.get_host_keys().add('[%s]:%d' % (self.hostname, self.port), 'ssh-rsa', public_host_key) self.tc.connect(self.hostname, self.port, username=self.username, gss_auth=True, gss_kex=True) self.event.wait(1.0) self.assert_(self.event.is_set()) self.assert_(self.ts.is_active()) self.assertEquals(self.username, self.ts.get_username()) self.assertEquals(True, self.ts.is_authenticated()) stdin, stdout, stderr = self.tc.exec_command('yes') schan = self.ts.accept(1.0) schan.send('Hello there.\n') schan.send_stderr('This is on stderr.\n') schan.close() self.assertEquals('Hello there.\n', stdout.readline()) self.assertEquals('', stdout.readline()) self.assertEquals('This is on stderr.\n', stderr.readline()) self.assertEquals('', stderr.readline()) stdin.close() stdout.close() stderr.close()
def handle(self): transport = paramiko.Transport(self.request) rsafile = self.server.cfg.get("ssh", "private_rsa") dsafile = self.server.cfg.get("ssh", "private_dsa") rsakey = paramiko.RSAKey(filename=rsafile) dsakey = paramiko.DSSKey(filename=dsafile) transport.add_server_key(rsakey) transport.add_server_key(dsakey) transport.local_version = self.server.cfg.get("ssh", "banner") transport.set_subsystem_handler('sftp', paramiko.SFTPServer, sftpServer.sftp_server) nw = network.network(self.client_address[0], self.server.cfg.get("wetland", "docker_addr")) nw.create() sServer = sshServer.ssh_server(transport=transport, network=nw) try: transport.start_server(server=sServer) except paramiko.SSHException: return except Exception as e: print e nw.delete() sServer.docker_trans.close() return try: while True: chann = transport.accept(60) # no channel left if not transport._channels.values(): break except Exception as e: print e finally: nw.delete() sServer.docker_trans.close()
def get_transport(host, username, key): """ Create a transport object :param host: the hostname to connect to :type host: str :param username: SSH username :type username: str :param key: key object used for authentication :type key: paramiko.RSAKey :return: a transport object :rtype: paramiko.Transport """ if host == shakedown.master_ip(): transport = paramiko.Transport(host) else: transport_master = paramiko.Transport(shakedown.master_ip()) transport_master = start_transport(transport_master, username, key) if not transport_master.is_authenticated(): print("error: unable to authenticate {}@{} with key {}".format(username, shakedown.master_ip(), key_path)) return False try: channel = transport_master.open_channel('direct-tcpip', (host, 22), ('127.0.0.1', 0)) except paramiko.SSHException: print("error: unable to connect to {}".format(host)) return False transport = paramiko.Transport(channel) return transport
def start_transport(transport, username, key): """ Begin a transport client and authenticate it :param transport: the transport object to start :type transport: paramiko.Transport :param username: SSH username :type username: str :param key: key object used for authentication :type key: paramiko.RSAKey :return: the transport object passed :rtype: paramiko.Transport """ transport.start_client() agent = paramiko.agent.Agent() keys = itertools.chain((key,) if key else (), agent.get_keys()) for test_key in keys: try: transport.auth_publickey(username, test_key) break except paramiko.AuthenticationException as e: pass else: raise ValueError('No valid key supplied') return transport # SSH connection will be auto-terminated at the conclusion of this operation, causing # a race condition; the try/except block attempts to close the channel and/or transport # but does not issue a failure if it has already been closed.
def get_private_key(self, password=None): if self.private_key_file is not None: with open(self.private_key_file, "r") as f: key_head = f.readline() if 'DSA' in key_head: key_type = paramiko.DSSKey elif 'RSA' in key_head: key_type = paramiko.RSAKey else: raise ValueError("can't identify private key type") with open(self.private_key_file, "r") as f: return key_type.from_private_key(f, password=password)
def load_remote_rsa_key(self, remote_filename): """ Returns paramiko.RSAKey object for an RSA key located on the remote machine """ rfile = self.remote_file(remote_filename, 'r') key = get_rsa_key(key_file_obj=rfile) rfile.close() return key
def get_rsa_key(key_location=None, key_file_obj=None, passphrase=None, use_pycrypto=False): key_fobj = key_file_obj or open(key_location) try: if use_pycrypto: key = RSA.importKey(key_fobj, passphrase=passphrase) else: key = paramiko.RSAKey.from_private_key(key_fobj, password=passphrase) return key except (paramiko.SSHException, ValueError): raise exception.SSHError( "Invalid RSA private key file or missing passphrase: %s" % key_location)
def generate_rsa_key(): return paramiko.RSAKey.generate(2048)
def configure_client_socket(self): self.log.info("SSHProtocol: ccs: configuring client socket") host_key = paramiko.RSAKey(filename='test_rsa.key') self.sourcet = paramiko.Transport(self.source) try: self.sourcet.load_server_moduli() except: self.log.error("SSHProtocol: Failed to load moduli -- gex " \ "will be unsupported.)") raise self.sourcet.add_server_key(host_key) # Initialize the SSH connection to the server # BUGFIX: client must init before the server start. A race condition # was created between the server and the client. If the client can # get credentials to SSHServer before client socket config # this method there will be an error. self.log.info("SSHProtocol: initializing client to server SSH") self.init_ssh_client() # Start up the SSH server. server = SSHServer(self) try: self.sourcet.start_server(server=server) except paramiko.SSHException, x: self.log.error("SSHProtocol: SSH negotiation failed.") return # wait for auth chan = self.sourcet.accept(20) if chan is None: self.log.error("*** No channel.") sys.exit(1) self.log.info("SSHProtocol: User authenticated!") server.event.wait(10) if not server.event.isSet(): self.log.warn("SSHProtocol: client never asked for a anything.") return chan.send('\r\n\r\nThings just got real.\r\n\r\n') self.sourceschan = chan self.log.debug("SSHProtocol: finished configuring client socket") self.destschan = self.destt.open_session() self.destschan.get_pty() self.destschan.invoke_shell() # chan = t.open_session() # chan.get_pty() # chan.invoke_shell() #chan.close() #self.sourcet.close()
def get_user_auth_keys (self, username): """Parse the users's authorized_keys file if any to look for authorized keys""" if username in self.users_keys: return self.users_keys[username] self.users_keys[username] = [] userdir = os.path.expanduser("~" + username) if not userdir: return self.users_keys[username] keyfile = os.path.join(userdir, ".ssh/authorized_keys") if not keyfile or not os.path.exists(keyfile): return self.users_keys[username] with open(keyfile) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith("#"): continue values = [ x.strip() for x in line.split() ] exp = None try: int(values[0]) # bits value? except ValueError: # Type 1 or type 2, type 1 is bits in second value options_ktype = values[0] try: int(values[1]) # bits value? except ValueError: # type 2 with options ktype = options_ktype data = values[1] else: # Type 1 no options. exp = int(values[1]) data = values[2] else: # Type 1 no options. exp = int(values[1]) data = values[2] # XXX For now skip type 1 keys if exp is not None: continue if data: import base64 if ktype == "ssh-rsa": key = ssh.RSAKey(data=base64.decodebytes(data.encode('ascii'))) elif ktype == "ssh-dss": key = ssh.DSSKey(data=base64.decodebytes(data.encode('ascii'))) else: key = None if key: self.users_keys[username].append(key) return self.users_keys[username]
def _test_connection(self, **kwargs): """ (Most) kwargs get passed directly into SSHClient.connect(). The exception is ``allowed_keys`` which is stripped and handed to the ``NullServer`` used for testing. """ run_kwargs = {'allowed_keys': kwargs.pop('allowed_keys', None)} # Server setup threading.Thread(target=self._run, kwargs=run_kwargs).start() host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) public_host_key = paramiko.RSAKey(data=host_key.asbytes()) # Client setup self.tc = paramiko.SSHClient() self.tc.get_host_keys().add('[%s]:%d' % (self.addr, self.port), 'ssh-rsa', public_host_key) # Actual connection self.tc.connect(self.addr, self.port, username='slowdive', **kwargs) # Authentication successful? self.event.wait(1.0) self.assertTrue(self.event.is_set()) self.assertTrue(self.ts.is_active()) self.assertEqual('slowdive', self.ts.get_username()) self.assertEqual(True, self.ts.is_authenticated()) # Command execution functions? stdin, stdout, stderr = self.tc.exec_command('yes') schan = self.ts.accept(1.0) schan.send('Hello there.\n') schan.send_stderr('This is on stderr.\n') schan.close() self.assertEqual('Hello there.\n', stdout.readline()) self.assertEqual('', stdout.readline()) self.assertEqual('This is on stderr.\n', stderr.readline()) self.assertEqual('', stderr.readline()) # Cleanup stdin.close() stdout.close() stderr.close()