我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用paramiko.DSSKey()。
def from_string(cls, key_string): try: pkey = paramiko.RSAKey(file_obj=StringIO(key_string)) return pkey except paramiko.SSHException: try: pkey = paramiko.DSSKey(file_obj=StringIO(key_string)) return pkey except paramiko.SSHException: return None
def ssh_key_gen(length=2048, type='rsa', password=None, username='jumpserver', hostname=None): """Generate user ssh private and public key Use paramiko RSAKey generate it. :return private key str and public key str """ if hostname is None: hostname = os.uname()[1] f = StringIO() try: if type == 'rsa': private_key_obj = paramiko.RSAKey.generate(length) elif type == 'dsa': private_key_obj = paramiko.DSSKey.generate(length) else: raise IOError('SSH private key must be `rsa` or `dsa`') private_key_obj.write_private_key(f, password=password) private_key = f.getvalue() public_key = ssh_pubkey_gen(private_key_obj, username=username, hostname=hostname) return private_key, public_key except IOError: raise IOError('These is error when generate ssh key.')
def test_4_dict_set(self): hostdict = paramiko.HostKeys('hostfile.temp') key = paramiko.RSAKey(data=decodebytes(keyblob)) key_dss = paramiko.DSSKey(data=decodebytes(keyblob_dss)) hostdict['secure.example.com'] = { 'ssh-rsa': key, 'ssh-dss': key_dss } hostdict['fake.example.com'] = {} hostdict['fake.example.com']['ssh-rsa'] = key self.assertEqual(3, len(hostdict)) self.assertEqual(2, len(list(hostdict.values())[0])) self.assertEqual(1, len(list(hostdict.values())[1])) self.assertEqual(1, len(list(hostdict.values())[2])) fp = hexlify(hostdict['secure.example.com']['ssh-rsa'].get_fingerprint()).upper() self.assertEqual(b'7EC91BB336CB6D810B124B1353C32396', fp) fp = hexlify(hostdict['secure.example.com']['ssh-dss'].get_fingerprint()).upper() self.assertEqual(b'4478F0B9A23CC5182009FF755BC1D26C', fp)
def ssh_key_string_to_obj(text): key_f = StringIO(text) key = None try: key = paramiko.RSAKey.from_private_key(key_f) except paramiko.SSHException: pass try: key = paramiko.DSSKey.from_private_key(key_f) except paramiko.SSHException: pass return key
def handle(self): transport = paramiko.Transport(self.request) rsafile = self.server.cfg.get("ssh", "private_rsa") dsafile = self.server.cfg.get("ssh", "private_dsa") rsakey = paramiko.RSAKey(filename=rsafile) dsakey = paramiko.DSSKey(filename=dsafile) transport.add_server_key(rsakey) transport.add_server_key(dsakey) transport.local_version = self.server.cfg.get("ssh", "banner") transport.set_subsystem_handler('sftp', paramiko.SFTPServer, sftpServer.sftp_server) nw = network.network(self.client_address[0], self.server.cfg.get("wetland", "docker_addr")) nw.create() sServer = sshServer.ssh_server(transport=transport, network=nw) try: transport.start_server(server=sServer) except paramiko.SSHException: return except Exception as e: print e nw.delete() sServer.docker_trans.close() return try: while True: chann = transport.accept(60) # no channel left if not transport._channels.values(): break except Exception as e: print e finally: nw.delete() sServer.docker_trans.close()
def get_private_key(self, password=None): if self.private_key_file is not None: with open(self.private_key_file, "r") as f: key_head = f.readline() if 'DSA' in key_head: key_type = paramiko.DSSKey elif 'RSA' in key_head: key_type = paramiko.RSAKey else: raise ValueError("can't identify private key type") with open(self.private_key_file, "r") as f: return key_type.from_private_key(f, password=password)
def get_user_auth_keys (self, username): """Parse the users's authorized_keys file if any to look for authorized keys""" if username in self.users_keys: return self.users_keys[username] self.users_keys[username] = [] userdir = os.path.expanduser("~" + username) if not userdir: return self.users_keys[username] keyfile = os.path.join(userdir, ".ssh/authorized_keys") if not keyfile or not os.path.exists(keyfile): return self.users_keys[username] with open(keyfile) as f: for line in f.readlines(): line = line.strip() if not line or line.startswith("#"): continue values = [ x.strip() for x in line.split() ] exp = None try: int(values[0]) # bits value? except ValueError: # Type 1 or type 2, type 1 is bits in second value options_ktype = values[0] try: int(values[1]) # bits value? except ValueError: # type 2 with options ktype = options_ktype data = values[1] else: # Type 1 no options. exp = int(values[1]) data = values[2] else: # Type 1 no options. exp = int(values[1]) data = values[2] # XXX For now skip type 1 keys if exp is not None: continue if data: import base64 if ktype == "ssh-rsa": key = ssh.RSAKey(data=base64.decodebytes(data.encode('ascii'))) elif ktype == "ssh-dss": key = ssh.DSSKey(data=base64.decodebytes(data.encode('ascii'))) else: key = None if key: self.users_keys[username].append(key) return self.users_keys[username]