我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用paramiko.Transport()。
def sftp_upload(host,port,username,password,local,remote): sf = paramiko.Transport((host,port)) sf.connect(username = username,password = password) sftp = paramiko.SFTPClient.from_transport(sf) try: if os.path.isdir(local):#????????????? for f in os.listdir(local):#?????? sftp.put(os.path.join(local+f),os.path.join(remote+f))#???????? else: sftp.put(local,remote)#???? except Exception,e: print('upload exception:',e) sf.close() #if __name__ == '__main__': # host = '121.69.75.194'#?? # port = 22 #?? # username = 'wac' #??? # password = '8112whz' #?? # local = '/Users/ngxin/Documents/xin/face_recognition/my_faces/' # remote = '/home/wac/ngxin/ftp_upload/' # local = 'F:\\sftptest\\'#?????????????????windows?????window??????????? # remote = '/opt/tianpy5/python/test/'#?????????????????linux???? #sftp_upload(host,port,username,password,local,remote)#?? #sftp_download(host,port,username,password,local,remote)#??
def copy_file(hostname, port, username, password, src, dst): client = paramiko.SSHClient() client.load_system_host_keys() print (" Connecting to %s \n with username=%s... \n" %(hostname,username)) t = paramiko.Transport(hostname, port) t.connect(username=username,password=password) sftp = paramiko.SFTPClient.from_transport(t) print ("Copying file: %s to path: %s" %(src, dst)) sftp.put(src, dst) sftp.close() t.close()
def get_remote_file(hostname, port, username, password, remotepath, localpath): """Get remote file to local machine. Args: hostname(str): Remote IP-address port(int): Remote SSH port username(str): Remote host username for authentication password(str): Remote host password for authentication remotepath(str): Remote file to download location path localpath(str): Local path to save remote file Examples:: get_remote_file(host, port, username, password, tar_remotepath, tar_localpath) """ transport = paramiko.Transport((hostname, port)) transport.connect(username=username, password=password) sftp = paramiko.SFTPClient.from_transport(transport) try: sftp.get(remotepath=remotepath, localpath=localpath) finally: sftp.close() transport.close()
def sftp(self,model=None,queue=None): self.sshConn = paramiko.Transport((self.hostname,self.port)) self.sshConn.connect(username=self.username, password=self.password) sftp = SSH(ssh=self.sshConn,hostname=self.hostname, queue=queue,model=model) return sftp
def handle_tcp_ssh(socket, dstport): try: t = paramiko.Transport(socket) t.local_version = 'SSH-2.0-OpenSSH_6.6.1p1 Ubuntu-2ubuntu2' t.load_server_moduli() # It can be safely commented out if it does not work on your system t.add_server_key(host_key_rsa) t.add_server_key(host_key_dss) server = Server(socket.getpeername()) t.start_server(server=server) t.join() except Exception as err: #print(traceback.format_exc()) pass try: print("-- SSH TRANSPORT CLOSED --") t.close() except: pass socket.close()
def handle(self): transport = paramiko.Transport(self.request) transport.add_server_key(pysshrp.common.config.keyData) transport.set_subsystem_handler('sftp', paramiko.SFTPServer, SFTPInterface) transport.start_server(server=self.clientthread) while transport.is_active() and pysshrp.common.running: time.sleep(1)
def __repr__(self): """ Returns a string representation of this object, for debugging. """ out = '<paramiko.Transport at %s' % hex(long(id(self)) & xffffffff) if not self.active: out += ' (unconnected)' else: if self.local_cipher != '': out += ' (cipher %s, %d bits)' % (self.local_cipher, self._cipher_info[self.local_cipher]['key-size'] * 8) if self.is_authenticated(): out += ' (active; %d open channel(s))' % len(self._channels) elif self.initial_kex_done: out += ' (connected; awaiting auth)' else: out += ' (connecting)' out += '>' return out
def get_cam_config(target_ip, target_user, target_pass, cam_config_loc_on_pi): config_name = cam_config_loc_on_pi.split("/")[-1] localpath = tempfolder + config_name try: port = 22 ssh_tran = paramiko.Transport((target_ip, port)) print(" - connecting transport pipe... " + target_ip + " port:" + str(port)) ssh_tran.connect(username=target_user, password=target_pass) sftp = paramiko.SFTPClient.from_transport(ssh_tran) print("looking for " + str(cam_config_loc_on_pi)) sftp.get(cam_config_loc_on_pi, localpath) print("--config file collected from pi--") sftp.close() ssh_tran.close() except Exception as e: print("!!! There was an issue, " + str(e)) return None return localpath
def get_test_pic(target_ip, target_user, target_pass, image_to_collect): img_name = image_to_collect.split("/")[-1] localpath = tempfolder + img_name try: port = 22 ssh_tran = paramiko.Transport((target_ip, port)) print(" - connecting transport pipe... " + target_ip + " port:" + str(port)) ssh_tran.connect(username=target_user, password=target_pass) sftp = paramiko.SFTPClient.from_transport(ssh_tran) sftp.get(image_to_collect, localpath) print("--image collected from pi--") sftp.close() ssh_tran.close() except Exception as e: print("!!! There was an issue, " + str(e)) return None return localpath
def get_test_pic_from_list(target_ip, target_user, target_pass, image_list): out_list = [] try: port = 22 ssh_tran = paramiko.Transport((target_ip, port)) print(" - connecting transport pipe... " + target_ip + " port:" + str(port)) ssh_tran.connect(username=target_user, password=target_pass) sftp = paramiko.SFTPClient.from_transport(ssh_tran) except Exception as e: print("!!! There was an issue, " + str(e)) return None for image_to_collect in image_list: print image_to_collect img_name = image_to_collect.split("/")[-1] localpath = tempfolder + img_name print localpath sftp.get(image_to_collect, localpath) print("--image " + str(image_to_collect) + " collected from pi--") out_list.append(localpath) sftp.close() ssh_tran.close() return out_list
def get_sftp(hostname, port=22, user='root', keyfile=None, password=None, keypw=None, pkey=None): """Get an SFTP connection using paramiko and plumbum. """ def get_transport(**kwargs): transport = paramiko.Transport(hostname, port) transport.connect(**kwargs) return transport def get_sftp(transport): return paramiko.SFTPClient.from_transport(transport) if keyfile: pkey = pkey or get_pkey(keyfile, password=keypw) try: transport = get_transport(username=user, pkey=pkey) return get_sftp(transport) except paramiko.ssh_exception.AuthenticationException: log.warn("Failed to auth ssh with keyfile {}".format(keyfile)) log_message = "Trying SSH connection to {} with credentials {}:{}" log.info(log_message.format(hostname, user, password)) transport = get_transport(username=user, password=password) return get_sftp(transport)
def sockthread(self): while True: try: conn, addr = self.listensock.accept() if not conn: return except: return self.transport = paramiko.Transport(conn) self.transport.add_server_key(self.server_key) if self.enable_sftp: self.transport.set_subsystem_handler('sftp', SFTPHandler) self.transport.start_server(server=self) t2 = threading.Thread(target=self.transportthread) t2.setDaemon(True) t2.start()
def __init__(self, sock, addr): super(SshServer, self).__init__() # tweak InternalApi and RootCmd for non-bgp related commands self.api = InternalApi(log_handler=logging.StreamHandler(sys.stderr)) setattr(self.api, 'sshserver', self) self.root = RootCmd(self.api) self.root.subcommands['help'] = self.HelpCmd self.root.subcommands['quit'] = self.QuitCmd transport = paramiko.Transport(sock) transport.load_server_moduli() host_key = self._find_ssh_server_key() transport.add_server_key(host_key) self.transport = transport transport.start_server(server=self)
def init_loopback(): global sftp, tc socks = LoopSocket() sockc = LoopSocket() sockc.link(socks) tc = paramiko.Transport(sockc) ts = paramiko.Transport(socks) host_key = paramiko.RSAKey.from_private_key_file(test_path('test_rsa.key')) ts.add_server_key(host_key) event = threading.Event() server = StubServer() ts.set_subsystem_handler('sftp', paramiko.SFTPServer, StubSFTPServer) ts.start_server(event, server) tc.connect(username='slowdive', password='pygmalion') event.wait(1.0) sftp = paramiko.SFTP.from_transport(tc)
def __repr__(self): """ Returns a string representation of this object, for debugging. """ out = '<paramiko.Transport at %s' % hex(long(id(self)) & xffffffff) if not self.active: out += ' (unconnected)' else: if self.local_cipher != '': out += ' (cipher %s, %d bits)' % ( self.local_cipher, self._cipher_info[self.local_cipher]['key-size'] * 8 ) if self.is_authenticated(): out += ' (active; %d open channel(s))' % len(self._channels) elif self.initial_kex_done: out += ' (connected; awaiting auth)' else: out += ' (connecting)' out += '>' return out
def getpeername(self): """ Return the address of the remote side of this Transport, if possible. This is effectively a wrapper around ``getpeername`` on the underlying socket. If the socket-like object has no ``getpeername`` method, then ``("unknown", 0)`` is returned. :return: the address of the remote host, if known, as a ``(str, int)`` tuple. """ gp = getattr(self.sock, 'getpeername', None) if gp is None: return 'unknown', 0 return gp()
def transferdb(): global curp #cmd = "mkdir /work/db" #execcmd(cmd) plan = assignplan() for ip in plan: ssh = paramiko.Transport((ip, 22)) #ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(username=username,password=password) sftp = paramiko.SFTPClient.from_transport(ssh) print "################################ %s ################################## " % ip for f in plan[ip]: print "\nPut ", f curp = 0 fname = f.split('/')[-1] sftp.put(f,'/work/db/'+fname, callback=showprocess) sftp.close()
def close(self): """Closes the connection and cleans up.""" # Close SFTP Connection. if self._sftp_live: self._sftp.close() self._sftp_live = False # Close the SSH Transport. if self._transport: self._transport.close() self._transport = None # clean up any loggers if self._cnopts.log: # if handlers are active they hang around until the app exits # this closes and removes the handlers if in use at close import logging lgr = logging.getLogger("paramiko") if lgr: lgr.handlers = []
def _connect_paramiko(self): remote = (self.host, self.port or 22) logging.debug('connect_paramiko: host=%s port=%s', *remote) self.transport = paramiko.Transport(remote) self.transport.connect() logging.debug('connect_paramiko: connected') try: self._check_host_key(self.host) except BaseException: self.transport.close() self.transport = None raise logging.debug('connect_paramiko: host key checked') self._authenticate(self.user) logging.debug('connect_paramiko: authenticated') self.sftp = paramiko.SFTPClient.from_transport(self.transport) logging.debug('connect_paramiko: end')
def print_remote_cpu_info(hostname, port, username, password): client = paramiko.Transport((hostname, port)) client.connect(username = username, password = password) stdout_data = [] stderr_data = [] session = client.open_channel(kind='session') session.exec_command(COMMAND) while True: if session.recv_ready(): stdout_data.append(session.recv(RECV_BYTES)) if session.recv_stderr_ready(): stderr_data.append(session.recv_stderr(RECV_BYTES)) if session.exit_status_ready(): break print 'exit status: ', session.recv_exit_status() print ''.join(stdout_data) print ''.join(stderr_data) session.close() client.close()
def sftp_conn(self): """""" try: self.transport = paramiko.Transport((self.host, self.port)) self.transport.connect(username = self.username, password = self.password) self.sftp = paramiko.SFTPClient.from_transport(self.transport) return True except paramiko.AuthenticationException: G.log(G.ERROR, "SFTP Authentication failed when connecting to %s" % self.host) return False except: G.log(G.ERROR, "SFTP Could not SSH to %s, waiting for it to start" % self.host) return False #----------------------------------------------------------------------
def handle(self, *args, **kwargs): t = paramiko.Transport(self.request) t.add_server_key(host_key) # Note that this actually spawns a new thread to handle the requests. # (Actually, paramiko.Transport is a subclass of Thread) t.start_server(server=self.server) # wait for auth chan = t.accept(20) time_taken = 0.0 while getattr(chan, 'spoonybard_command', False) is False: time.sleep(0.25) time_taken += 0.25 if time_taken > command_timeout: break if chan is None: print('No channel') t.close() return if getattr(chan, 'spoonybard_command', False): cmd = chan.spoonybard_command.split(' ')[0] args = ' '.join(chan.spoonybard_command.split(' ')[1:]) command = spoonybard.engine.plugins.get_command_handler(cmd) if command is None: chan.send('Sorry, %s is an unknown command.\n' % cmd) else: if command.parse_args(args): command.execute(chan) else: command.help(chan) chan.close() t.close()
def _connect(self): self._transport = paramiko.Transport((self.url.host, self.url.port or self._DEFAULT_PORT)) self._transport.connect(self._get_hostkey(), self.url.user, self.url.password, gss_host=socket.getfqdn(self.url.host), gss_auth=self.GSS_AUTH, gss_kex=self.GSS_KEX) self.__client = paramiko.SFTPClient.from_transport(self._transport) self.__client.chdir()
def _get_sftp_client(username, apikey): """Construct a paramiko SFTP client connected to drop.jarvice.com using the user's Nimbix credentials. Args: username(str): Nimbix username for platform.jarvice.com apikey(str): Nimbix apikey for platform.jarvice.com """ transport = paramiko.Transport(('drop.jarvice.com', 22)) transport.connect(username=username, password=apikey) sftp = paramiko.SFTPClient.from_transport(transport) return sftp
def thread(self,model,queue,cmd=None, cmd_args=None,localPath=None, remotePath=None,remoteFile=None): if model == 'command': self.sshConn = paramiko.SSHClient() self.sshConn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.sshConn.connect(hostname = self.hostname,port=self.port, username=self.username, password=self.password) self.threads = SSH(ssh=self.sshConn,hostname=self.hostname, queue=queue,model=model,cmd=cmd, cmd_args=cmd_args,localPath=localPath, remotePath=remotePath,remoteFile=remoteFile) elif model in ['upload','download']: self.sshConn = paramiko.Transport((self.hostname,self.port)) self.sshConn.connect(username=self.username, password=self.password) self.threads = SSH(ssh=self.sshConn,hostname=self.hostname, queue=queue,model=model,cmd=cmd, cmd_args=cmd_args,localPath=localPath, remotePath=remotePath,remoteFile=remoteFile)
def __repr__(self): """ Returns a string representation of this object, for debugging. @rtype: str """ out = '<paramiko.Transport at %s' % hex(long(id(self)) & 0xffffffffL) if not self.active: out += ' (unconnected)' else: if self.local_cipher != '': out += ' (cipher %s, %d bits)' % (self.local_cipher, self._cipher_info[self.local_cipher]['key-size'] * 8) if self.is_authenticated(): out += ' (active; %d open channel(s))' % len(self._channels) elif self.initial_kex_done: out += ' (connected; awaiting auth)' else: out += ' (connecting)' out += '>' return out
def atfork(self): """ Terminate this Transport without closing the session. On posix systems, if a Transport is open during process forking, both parent and child will share the underlying socket, but only one process can use the connection (without corrupting the session). Use this method to clean up a Transport object without disrupting the other process. @since: 1.5.3 """ self.close()
def load_server_moduli(filename=None): """ I{(optional)} Load a file of prime moduli for use in doing group-exchange key negotiation in server mode. It's a rather obscure option and can be safely ignored. In server mode, the remote client may request "group-exchange" key negotiation, which asks the server to send a random prime number that fits certain criteria. These primes are pretty difficult to compute, so they can't be generated on demand. But many systems contain a file of suitable primes (usually named something like C{/etc/ssh/moduli}). If you call C{load_server_moduli} and it returns C{True}, then this file of primes has been loaded and we will support "group-exchange" in server mode. Otherwise server mode will just claim that it doesn't support that method of key negotiation. @param filename: optional path to the moduli file, if you happen to know that it's not in a standard location. @type filename: str @return: True if a moduli file was successfully loaded; False otherwise. @rtype: bool @note: This has no effect when used in client mode. """ Transport._modulus_pack = ModulusPack(rng) # places to look for the openssh "moduli" file file_list = [ '/etc/ssh/moduli', '/usr/local/etc/moduli' ] if filename is not None: file_list.insert(0, filename) for fn in file_list: try: Transport._modulus_pack.read_file(fn) return True except IOError: pass # none succeeded Transport._modulus_pack = None return False
def transfer_file(hostname_, port_, username_, password_, fdir_, fname_): try: print('Establishing SSH connection to:', hostname_, port_, '...') t = paramiko.Transport((hostname_, port_)) t.start_client() if not t.is_authenticated(): print('Trying password login...') t.auth_password(username=username_, password=password_) sftp = paramiko.SFTPClient.from_transport(t) local_file = os.path.join(fdir_, fname_) remote_file = DIR_REMOTE + '/' + fname_ try: print('start transport...') sftp.put(local_file, remote_file) except: LOG.error('error...') raise t.close() LOG.info('??????????zip??...') os.remove(local_file) except Exception as e: print(e) try: LOG.info('end transport and close it...') t.close() except: pass
def init_ssh_client(self): self.log.debug("SSHProtocol: Initializing the destination transport") self.destt = paramiko.Transport(self.destination) try: self.destt.start_client() except paramiko.SSHException: self.log.error("*** SSH negotiation failed.") return
def copy_sslcertificate_to_cpx(): transport = paramiko.Transport(DOCKER_IP, 22) transport.connect(username='root', password='linux') sftp = paramiko.SFTPClient.from_transport(transport) here = os.path.dirname(os.path.realpath(__file__)) for file in ['server.crt', 'server.key', 'server2.key', 'server2.crt']: sftp.put(os.path.join(here, 'datafiles', file), os.path.join('/nsconfig/ssl', file))
def print_remote_cpu_info(hostname, port, username, password): client = paramiko.Transport((hostname, port)) client.connect(username=username, password=password) stdout_data = [] stderr_data = [] session = client.open_channel(kind='session') session.exec_command(COMMAND) while True: if session.recv_ready(): stdout_data.append(session.recv(RECV_BYTES)) if session.recv_stderr_ready(): stderr_data.append(session.recv_stderr(RECV_BYTES)) if session.exit_status_ready(): break print ('exit status: ', session.recv_exit_status()) print (b''.join(stdout_data)) print (b''.join(stderr_data)) session.close() client.close()
def connect_to_sftp_server(username, password, hostname, port): t = paramiko.Transport((hostname, port)) t.connect(username=username, password=password) return paramiko.SFTPClient.from_transport(t)
def _get_sftp_client(self): """ returns a sftp client object and the corresponding transport socket. Both must be closed after use. """ transport = paramiko.Transport((self.servername, 22)) # transport.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # If no password is supplied, try to look for a private key if self.password is "" or None: rsa_keypath = os.path.expanduser(self.rsakey_local_path) if not os.path.isfile(rsa_keypath): # login with provided username + empty password try: transport.connect(username=self.username, password="") except Exception: logger.error("Cannot connect to " + self.servername) logger.error("Check username, password or key in " + self.rsakey_local_path) raise else: # login with provided username + private key rsa_key = paramiko.RSAKey.from_private_key_file(rsa_keypath) try: transport.connect(username=self.username, pkey=rsa_key) except Exception: logger.error("Cannot connect to " + self.servername) logger.error("Check username and your key in " + self.rsakey_local_path) raise else: # login with provided username + password transport.connect(username=self.username, password=self.password) sftp = paramiko.SFTPClient.from_transport(transport) return sftp, transport
def __init__ (self, server_ctl, session_class, extra_args, server, newsocket, addr, debug): self.session_class = session_class self.extra_args = extra_args self.server = server self.client_socket = newsocket self.client_addr = addr self.debug = debug self.server_ctl = server_ctl self.sessions = [] try: if self.debug: logger.debug("%s: Opening SSH connection", str(self)) self.ssh = ssh.Transport(self.client_socket) self.ssh.add_server_key(self.server.host_key) self.ssh.start_server(server=self.server_ctl) except ssh.AuthenticationException as error: self.client_socket.close() self.client_socket = None logger.error("Authentication failed: %s", str(error)) raise self.lock = threading.Lock() self.running = True self.thread = threading.Thread(None, self._accept_chan_thread, name="SSHAcceptThread") self.thread.daemon = True self.thread.start()
def _get_transport_via_ip(self): for i in range(60): try: channel = self._client.get_transport().open_channel( 'direct-tcpip', (self._hostname, 22), (self.via_ip, 0)) except ssh_exception.ChannelException: LOG.debug('%s creating the direct-tcip connections' % self.description) time.sleep(1) else: transport = paramiko.Transport(channel) transport.start_client() transport.auth_publickey(self._user, self._private_key) return transport raise Exception()
def retry_hack(self): if retry_hack: #check for global flag first self.endpoint.close() dstsock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) dstsock.connect((self.rhost,self.rport)) self.endpoint = paramiko.Transport(dstsock) self.endpoint.start_client()
def client_handler_helper(sock,address,dst_sock,kill_switch,hijack_flag): dt = datetime.datetime.today() logfile_name = dt.__str__() + ".log" print_purp("[c.c] Logging to %s" % logfile_name) try: os.mkdir('logs') except: pass with open("logs/%s"%(logfile_name),"w") as logfile: logfile.write("<(^.^)>") logfile.write("Inbound connection from %s:%d\n" %address) logfile.write("Posing as: %s:%d\n" % (DST_IP,DST_PORT)) logfile.write("_________________\n") # connect outbound ssh connection out_trans = paramiko.Transport(dst_sock) out_trans.start_client() print_attn("[0.<] Started Transport session....") if sniff == True: ssh_client_handler(sock,address,out_trans,logfile,kill_switch,hijack_flag) else: tcp_client_handler(sock,address,out_trans,logfile,kill_switch)
def process_request(self, client, addr): rc = self.request_context({'REMOTE_ADDR': addr[0]}) rc.push() logger.info("Get ssh request from %s" % request.environ['REMOTE_ADDR']) transport = paramiko.Transport(client, gss_kex=False) try: transport.load_server_moduli() except: logger.warning('Failed to load moduli -- gex will be unsupported.') raise transport.add_server_key(SSHInterface.get_host_key()) # ?app??????????, ssh_interface ??ssh??????? ssh_interface = SSHInterface(self, rc) try: transport.start_server(server=ssh_interface) except paramiko.SSHException: logger.warning('SSH negotiation failed.') sys.exit(1) client_channel = transport.accept(20) if client_channel is None: logger.warning('No ssh channel get.') sys.exit(1) if request.method == 'shell': logger.info('Client asked for a shell.') InteractiveServer(self, ssh_interface.user_service, client_channel).run() elif request.method == 'command': client_channel.send(wr(warning('We are not support command now'))) client_channel.close() sys.exit(2) else: client_channel.send(wr(warning('Not support the request method'))) client_channel.close() sys.exit(2)
def atfork(self): """ Terminate this Transport without closing the session. On posix systems, if a Transport is open during process forking, both parent and child will share the underlying socket, but only one process can use the connection (without corrupting the session). Use this method to clean up a Transport object without disrupting the other process. .. versionadded:: 1.5.3 """ self.sock.close() self.close()
def load_server_moduli(filename=None): """ (optional) Load a file of prime moduli for use in doing group-exchange key negotiation in server mode. It's a rather obscure option and can be safely ignored. In server mode, the remote client may request "group-exchange" key negotiation, which asks the server to send a random prime number that fits certain criteria. These primes are pretty difficult to compute, so they can't be generated on demand. But many systems contain a file of suitable primes (usually named something like ``/etc/ssh/moduli``). If you call `load_server_moduli` and it returns ``True``, then this file of primes has been loaded and we will support "group-exchange" in server mode. Otherwise server mode will just claim that it doesn't support that method of key negotiation. :param str filename: optional path to the moduli file, if you happen to know that it's not in a standard location. :return: True if a moduli file was successfully loaded; False otherwise. .. note:: This has no effect when used in client mode. """ Transport._modulus_pack = ModulusPack() # places to look for the openssh "moduli" file file_list = ['/etc/ssh/moduli', '/usr/local/etc/moduli'] if filename is not None: file_list.insert(0, filename) for fn in file_list: try: Transport._modulus_pack.read_file(fn) return True except IOError: pass # none succeeded Transport._modulus_pack = None return False
def getpeername(self): """ Return the address of the remote side of this Transport, if possible. This is effectively a wrapper around ``'getpeername'`` on the underlying socket. If the socket-like object has no ``'getpeername'`` method, then ``("unknown", 0)`` is returned. :return: the address of the remote host, if known, as a ``(str, int)`` tuple. """ gp = getattr(self.sock, 'getpeername', None) if gp is None: return 'unknown', 0 return gp()
def run(host, port, username, password, blockhouseuser_id=""): # hostname = "192.168.126.230" # port = 6322 # username = "bwweb" # pwd = "123456" global blockhouseuser_glo_id blockhouseuser_glo_id = blockhouseuser_id # ?????,?????????? tran = paramiko.Transport((host, port,)) tran.start_client() tran.auth_password(username, password) # ?????? chan = tran.open_session() # ?????? chan.get_pty() # ??? chan.invoke_shell() interactive_shell(chan) chan.close() tran.close()
def connect_server(self): """ ?????? :return: """ transport = paramiko.Transport((self.host, self.port)) transport.connect(username=self.username, password=self.password) self.__transport = transport