我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pty.STDIN_FILENO。
def do_execute(self, stdin_file, stdout_file, stderr_file, cgroup_file): pid = fork() if not pid: chdir('/in/package') if stdin_file: fd = os_open(stdin_file, O_RDONLY) dup2(fd, STDIN_FILENO) os_close(fd) if stdout_file: fd = os_open(stdout_file, O_WRONLY) dup2(fd, STDOUT_FILENO) os_close(fd) if stderr_file: fd = os_open(stderr_file, O_WRONLY) dup2(fd, STDERR_FILENO) os_close(fd) if cgroup_file: enter_cgroup(cgroup_file) execve(self.execute_file, self.execute_args, SPAWN_ENV) return wait_and_reap_zombies(pid)
def test__copy_eof_on_all(self): """Test the empty read EOF case on both master_fd and stdin.""" read_from_stdout_fd, mock_stdout_fd = self._pipe() pty.STDOUT_FILENO = mock_stdout_fd mock_stdin_fd, write_to_stdin_fd = self._pipe() pty.STDIN_FILENO = mock_stdin_fd socketpair = socket.socketpair() masters = [s.fileno() for s in socketpair] self.fds.extend(masters) os.close(masters[1]) socketpair[1].close() os.close(write_to_stdin_fd) # Expect two select calls, the last one will cause IndexError pty.select = self._mock_select self.select_rfds_lengths.append(2) self.select_rfds_results.append([mock_stdin_fd, masters[0]]) # We expect that both fds were removed from the fds list as they # both encountered an EOF before the second select call. self.select_rfds_lengths.append(0) with self.assertRaises(IndexError): pty._copy(masters[0])
def test__copy_eof_on_all(self): """Test the empty read EOF case on both master_fd and stdin.""" read_from_stdout_fd, mock_stdout_fd = self._pipe() pty.STDOUT_FILENO = mock_stdout_fd mock_stdin_fd, write_to_stdin_fd = self._pipe() pty.STDIN_FILENO = mock_stdin_fd socketpair = self._socketpair() masters = [s.fileno() for s in socketpair] os.close(masters[1]) socketpair[1].close() os.close(write_to_stdin_fd) # Expect two select calls, the last one will cause IndexError pty.select = self._mock_select self.select_rfds_lengths.append(2) self.select_rfds_results.append([mock_stdin_fd, masters[0]]) # We expect that both fds were removed from the fds list as they # both encountered an EOF before the second select call. self.select_rfds_lengths.append(0) with self.assertRaises(IndexError): pty._copy(masters[0])
def _copy(self): ''' Main select loop. Passes all data to self.master_read() or self.stdin_read(). ''' assert self.master_fd is not None master_fd = self.master_fd while True: rfds, wfds, xfds = select.select([master_fd, pty.STDIN_FILENO], [], []) anything = False if master_fd in rfds: anything |= self._has_child_response() if pty.STDIN_FILENO in rfds: anything |= self._has_user_input() if not anything: break
def _copy(self): """Main select loop. Passes all data to self.master_read() or self.stdin_read(). """ assert self.master_fd is not None master_fd = self.master_fd bufsize = self.bufsize while True: try: rfds, wfds, xfds = select.select([master_fd, pty.STDIN_FILENO], [], []) except OSError as e: if e.errno == 4: # Interrupted system call. continue # This happens at terminal resize. if master_fd in rfds: data = os.read(master_fd, bufsize) self.write_stdout(data) if pty.STDIN_FILENO in rfds: data = os.read(pty.STDIN_FILENO, bufsize) self.write_stdin(data)
def __interact_copy(self, escape_character=None, input_filter=None, output_filter=None): """This is used by the interact() method. """ while self.isalive(): r, w, e = self.__select([self.child_fd, self.STDIN_FILENO], [], []) if self.child_fd in r: data = self.__interact_read(self.child_fd) if output_filter: data = output_filter(data) if self.logfile is not None: self.logfile.write(data) self.logfile.flush() os.write(self.STDOUT_FILENO, data) if self.STDIN_FILENO in r: data = self.__interact_read(self.STDIN_FILENO) if input_filter: data = input_filter(data) i = data.rfind(escape_character) if i != -1: data = data[:i] self.__interact_writen(self.child_fd, data) break self.__interact_writen(self.child_fd, data)
def __interact_copy(self, escape_character=None, input_filter=None, output_filter=None): '''This is used by the interact() method. ''' while self.isalive(): r, w, e = select_ignore_interrupts([self.child_fd, self.STDIN_FILENO], [], []) if self.child_fd in r: try: data = self.__interact_read(self.child_fd) except OSError as err: if err.args[0] == errno.EIO: # Linux-style EOF break raise if data == b'': # BSD-style EOF break if output_filter: data = output_filter(data) self._log(data, 'read') os.write(self.STDOUT_FILENO, data) if self.STDIN_FILENO in r: data = self.__interact_read(self.STDIN_FILENO) if input_filter: data = input_filter(data) i = -1 if escape_character is not None: i = data.rfind(escape_character) if i != -1: data = data[:i] if data: self._log(data, 'send') self.__interact_writen(self.child_fd, data) break self._log(data, 'send') self.__interact_writen(self.child_fd, data)
def do_build(self, output_file, cgroup_file): pid = fork() if not pid: chdir('/out') os_close(STDIN_FILENO) if output_file: fd = os_open(output_file, O_WRONLY) dup2(fd, STDOUT_FILENO) dup2(fd, STDERR_FILENO) os_close(fd) if cgroup_file: enter_cgroup(cgroup_file) execve(self.compiler_file, self.compiler_args, SPAWN_ENV) return wait_and_reap_zombies(pid)
def setUp(self): self.orig_stdin_fileno = pty.STDIN_FILENO self.orig_stdout_fileno = pty.STDOUT_FILENO self.orig_pty_select = pty.select self.fds = [] # A list of file descriptors to close. self.select_rfds_lengths = [] self.select_rfds_results = []
def tearDown(self): pty.STDIN_FILENO = self.orig_stdin_fileno pty.STDOUT_FILENO = self.orig_stdout_fileno pty.select = self.orig_pty_select for fd in self.fds: try: os.close(fd) except: pass
def test__copy_to_each(self): """Test the normal data case on both master_fd and stdin.""" read_from_stdout_fd, mock_stdout_fd = self._pipe() pty.STDOUT_FILENO = mock_stdout_fd mock_stdin_fd, write_to_stdin_fd = self._pipe() pty.STDIN_FILENO = mock_stdin_fd socketpair = socket.socketpair() masters = [s.fileno() for s in socketpair] self.fds.extend(masters) # Feed data. Smaller than PIPEBUF. These writes will not block. os.write(masters[1], b'from master') os.write(write_to_stdin_fd, b'from stdin') # Expect two select calls, the last one will cause IndexError pty.select = self._mock_select self.select_rfds_lengths.append(2) self.select_rfds_results.append([mock_stdin_fd, masters[0]]) self.select_rfds_lengths.append(2) with self.assertRaises(IndexError): pty._copy(masters[0]) # Test that the right data went to the right places. rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0] self.assertEqual([read_from_stdout_fd, masters[1]], rfds) self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master') self.assertEqual(os.read(masters[1], 20), b'from stdin')
def setUp(self): self.orig_stdin_fileno = pty.STDIN_FILENO self.orig_stdout_fileno = pty.STDOUT_FILENO self.orig_pty_select = pty.select self.fds = [] # A list of file descriptors to close. self.files = [] self.select_rfds_lengths = [] self.select_rfds_results = []
def tearDown(self): pty.STDIN_FILENO = self.orig_stdin_fileno pty.STDOUT_FILENO = self.orig_stdout_fileno pty.select = self.orig_pty_select for file in self.files: try: file.close() except OSError: pass for fd in self.fds: try: os.close(fd) except OSError: pass
def test__copy_to_each(self): """Test the normal data case on both master_fd and stdin.""" read_from_stdout_fd, mock_stdout_fd = self._pipe() pty.STDOUT_FILENO = mock_stdout_fd mock_stdin_fd, write_to_stdin_fd = self._pipe() pty.STDIN_FILENO = mock_stdin_fd socketpair = self._socketpair() masters = [s.fileno() for s in socketpair] # Feed data. Smaller than PIPEBUF. These writes will not block. os.write(masters[1], b'from master') os.write(write_to_stdin_fd, b'from stdin') # Expect two select calls, the last one will cause IndexError pty.select = self._mock_select self.select_rfds_lengths.append(2) self.select_rfds_results.append([mock_stdin_fd, masters[0]]) self.select_rfds_lengths.append(2) with self.assertRaises(IndexError): pty._copy(masters[0]) # Test that the right data went to the right places. rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0] self.assertEqual([read_from_stdout_fd, masters[1]], rfds) self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master') self.assertEqual(os.read(masters[1], 20), b'from stdin')
def _init_tty(self): try: self.mode = tty.tcgetattr(pty.STDIN_FILENO) tty.setraw(pty.STDIN_FILENO) # this seems to change the behavior of the stdout except tty.error: pass
def _del_tty(self): if self.mode: # restore the tty mode for stdout tty.tcsetattr(pty.STDIN_FILENO, tty.TCSAFLUSH, self.mode)
def _read_stdin(self, bufsize): return os.read(pty.STDIN_FILENO, bufsize)
def __fork_pty(self): '''This implements a substitute for the forkpty system call. This should be more portable than the pty.fork() function. Specifically, this should work on Solaris. Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to resolve the issue with Python's pty.fork() not supporting Solaris, particularly ssh. Based on patch to posixmodule.c authored by Noah Spurrier:: http://mail.python.org/pipermail/python-dev/2003-May/035281.html ''' parent_fd, child_fd = os.openpty() if parent_fd < 0 or child_fd < 0: raise ExceptionPexpect("Could not open with os.openpty().") pid = os.fork() if pid == pty.CHILD: # Child. os.close(parent_fd) self.__pty_make_controlling_tty(child_fd) os.dup2(child_fd, self.STDIN_FILENO) os.dup2(child_fd, self.STDOUT_FILENO) os.dup2(child_fd, self.STDERR_FILENO) else: # Parent. os.close(child_fd) return pid, parent_fd
def __interact_copy(self, escape_character=None, input_filter=None, output_filter=None): '''This is used by the interact() method. ''' while self.isalive(): r, w, e = self.__select([self.child_fd, self.STDIN_FILENO], [], []) if self.child_fd in r: try: data = self.__interact_read(self.child_fd) except OSError as err: if err.args[0] == errno.EIO: # Linux-style EOF break raise if data == b'': # BSD-style EOF break if output_filter: data = output_filter(data) if self.logfile is not None: self.logfile.write(data) self.logfile.flush() os.write(self.STDOUT_FILENO, data) if self.STDIN_FILENO in r: data = self.__interact_read(self.STDIN_FILENO) if input_filter: data = input_filter(data) i = data.rfind(escape_character) if i != -1: data = data[:i] self.__interact_writen(self.child_fd, data) break self.__interact_writen(self.child_fd, data)
def interact(self, escape_character=chr(29), input_filter=None, output_filter=None): '''This gives control of the child process to the interactive user (the human at the keyboard). Keystrokes are sent to the child process, and the stdout and stderr output of the child process is printed. This simply echos the child stdout and child stderr to the real stdout and it echos the real stdin to the child stdin. When the user types the escape_character this method will return None. The escape_character will not be transmitted. The default for escape_character is entered as ``Ctrl - ]``, the very same as BSD telnet. To prevent escaping, escape_character may be set to None. If a logfile is specified, then the data sent and received from the child process in interact mode is duplicated to the given log. You may pass in optional input and output filter functions. These functions should take a string and return a string. The output_filter will be passed all the output from the child process. The input_filter will be passed all the keyboard input from the user. The input_filter is run BEFORE the check for the escape_character. Note that if you change the window size of the parent the SIGWINCH signal will not be passed through to the child. If you want the child window size to change when the parent's window size changes then do something like the following example:: import pexpect, struct, fcntl, termios, signal, sys def sigwinch_passthrough (sig, data): s = struct.pack("HHHH", 0, 0, 0, 0) a = struct.unpack('hhhh', fcntl.ioctl(sys.stdout.fileno(), termios.TIOCGWINSZ , s)) global p p.setwinsize(a[0],a[1]) # Note this 'p' global and used in sigwinch_passthrough. p = pexpect.spawn('/bin/bash') signal.signal(signal.SIGWINCH, sigwinch_passthrough) p.interact() ''' # Flush the buffer. self.write_to_stdout(self.buffer) self.stdout.flush() self.buffer = self.string_type() mode = tty.tcgetattr(self.STDIN_FILENO) tty.setraw(self.STDIN_FILENO) if escape_character is not None and PY3: escape_character = escape_character.encode('latin-1') try: self.__interact_copy(escape_character, input_filter, output_filter) finally: tty.tcsetattr(self.STDIN_FILENO, tty.TCSAFLUSH, mode)