我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pty.STDOUT_FILENO。
def do_execute(self, stdin_file, stdout_file, stderr_file, cgroup_file): pid = fork() if not pid: chdir('/in/package') if stdin_file: fd = os_open(stdin_file, O_RDONLY) dup2(fd, STDIN_FILENO) os_close(fd) if stdout_file: fd = os_open(stdout_file, O_WRONLY) dup2(fd, STDOUT_FILENO) os_close(fd) if stderr_file: fd = os_open(stderr_file, O_WRONLY) dup2(fd, STDERR_FILENO) os_close(fd) if cgroup_file: enter_cgroup(cgroup_file) execve(self.execute_file, self.execute_args, SPAWN_ENV) return wait_and_reap_zombies(pid)
def test__copy_eof_on_all(self): """Test the empty read EOF case on both master_fd and stdin.""" read_from_stdout_fd, mock_stdout_fd = self._pipe() pty.STDOUT_FILENO = mock_stdout_fd mock_stdin_fd, write_to_stdin_fd = self._pipe() pty.STDIN_FILENO = mock_stdin_fd socketpair = socket.socketpair() masters = [s.fileno() for s in socketpair] self.fds.extend(masters) os.close(masters[1]) socketpair[1].close() os.close(write_to_stdin_fd) # Expect two select calls, the last one will cause IndexError pty.select = self._mock_select self.select_rfds_lengths.append(2) self.select_rfds_results.append([mock_stdin_fd, masters[0]]) # We expect that both fds were removed from the fds list as they # both encountered an EOF before the second select call. self.select_rfds_lengths.append(0) with self.assertRaises(IndexError): pty._copy(masters[0])
def test__copy_eof_on_all(self): """Test the empty read EOF case on both master_fd and stdin.""" read_from_stdout_fd, mock_stdout_fd = self._pipe() pty.STDOUT_FILENO = mock_stdout_fd mock_stdin_fd, write_to_stdin_fd = self._pipe() pty.STDIN_FILENO = mock_stdin_fd socketpair = self._socketpair() masters = [s.fileno() for s in socketpair] os.close(masters[1]) socketpair[1].close() os.close(write_to_stdin_fd) # Expect two select calls, the last one will cause IndexError pty.select = self._mock_select self.select_rfds_lengths.append(2) self.select_rfds_results.append([mock_stdin_fd, masters[0]]) # We expect that both fds were removed from the fds list as they # both encountered an EOF before the second select call. self.select_rfds_lengths.append(0) with self.assertRaises(IndexError): pty._copy(masters[0])
def __interact_copy(self, escape_character=None, input_filter=None, output_filter=None): """This is used by the interact() method. """ while self.isalive(): r, w, e = self.__select([self.child_fd, self.STDIN_FILENO], [], []) if self.child_fd in r: data = self.__interact_read(self.child_fd) if output_filter: data = output_filter(data) if self.logfile is not None: self.logfile.write(data) self.logfile.flush() os.write(self.STDOUT_FILENO, data) if self.STDIN_FILENO in r: data = self.__interact_read(self.STDIN_FILENO) if input_filter: data = input_filter(data) i = data.rfind(escape_character) if i != -1: data = data[:i] self.__interact_writen(self.child_fd, data) break self.__interact_writen(self.child_fd, data)
def _signal_winch(self, signum, frame): if self.set_pty_size is not None: buf = array.array('h', [0, 0, 0, 0]) fcntl.ioctl(pty.STDOUT_FILENO, termios.TIOCGWINSZ, buf, True) self.set_pty_size(buf[0], buf[1], buf[2], buf[3])
def set_pty_size(self, p1, p2, p3, p4): buf = array.array('h', [p1, p2, p3, p4]) #fcntl.ioctl(pty.STDOUT_FILENO, termios.TIOCSWINSZ, buf) fcntl.ioctl(self.master, termios.TIOCSWINSZ, buf)
def __interact_copy(self, escape_character=None, input_filter=None, output_filter=None): '''This is used by the interact() method. ''' while self.isalive(): r, w, e = select_ignore_interrupts([self.child_fd, self.STDIN_FILENO], [], []) if self.child_fd in r: try: data = self.__interact_read(self.child_fd) except OSError as err: if err.args[0] == errno.EIO: # Linux-style EOF break raise if data == b'': # BSD-style EOF break if output_filter: data = output_filter(data) self._log(data, 'read') os.write(self.STDOUT_FILENO, data) if self.STDIN_FILENO in r: data = self.__interact_read(self.STDIN_FILENO) if input_filter: data = input_filter(data) i = -1 if escape_character is not None: i = data.rfind(escape_character) if i != -1: data = data[:i] if data: self._log(data, 'send') self.__interact_writen(self.child_fd, data) break self._log(data, 'send') self.__interact_writen(self.child_fd, data)
def do_build(self, output_file, cgroup_file): pid = fork() if not pid: chdir('/out') os_close(STDIN_FILENO) if output_file: fd = os_open(output_file, O_WRONLY) dup2(fd, STDOUT_FILENO) dup2(fd, STDERR_FILENO) os_close(fd) if cgroup_file: enter_cgroup(cgroup_file) execve(self.compiler_file, self.compiler_args, SPAWN_ENV) return wait_and_reap_zombies(pid)
def setUp(self): self.orig_stdin_fileno = pty.STDIN_FILENO self.orig_stdout_fileno = pty.STDOUT_FILENO self.orig_pty_select = pty.select self.fds = [] # A list of file descriptors to close. self.select_rfds_lengths = [] self.select_rfds_results = []
def tearDown(self): pty.STDIN_FILENO = self.orig_stdin_fileno pty.STDOUT_FILENO = self.orig_stdout_fileno pty.select = self.orig_pty_select for fd in self.fds: try: os.close(fd) except: pass
def test__copy_to_each(self): """Test the normal data case on both master_fd and stdin.""" read_from_stdout_fd, mock_stdout_fd = self._pipe() pty.STDOUT_FILENO = mock_stdout_fd mock_stdin_fd, write_to_stdin_fd = self._pipe() pty.STDIN_FILENO = mock_stdin_fd socketpair = socket.socketpair() masters = [s.fileno() for s in socketpair] self.fds.extend(masters) # Feed data. Smaller than PIPEBUF. These writes will not block. os.write(masters[1], b'from master') os.write(write_to_stdin_fd, b'from stdin') # Expect two select calls, the last one will cause IndexError pty.select = self._mock_select self.select_rfds_lengths.append(2) self.select_rfds_results.append([mock_stdin_fd, masters[0]]) self.select_rfds_lengths.append(2) with self.assertRaises(IndexError): pty._copy(masters[0]) # Test that the right data went to the right places. rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0] self.assertEqual([read_from_stdout_fd, masters[1]], rfds) self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master') self.assertEqual(os.read(masters[1], 20), b'from stdin')
def setUp(self): self.orig_stdin_fileno = pty.STDIN_FILENO self.orig_stdout_fileno = pty.STDOUT_FILENO self.orig_pty_select = pty.select self.fds = [] # A list of file descriptors to close. self.files = [] self.select_rfds_lengths = [] self.select_rfds_results = []
def tearDown(self): pty.STDIN_FILENO = self.orig_stdin_fileno pty.STDOUT_FILENO = self.orig_stdout_fileno pty.select = self.orig_pty_select for file in self.files: try: file.close() except OSError: pass for fd in self.fds: try: os.close(fd) except OSError: pass
def test__copy_to_each(self): """Test the normal data case on both master_fd and stdin.""" read_from_stdout_fd, mock_stdout_fd = self._pipe() pty.STDOUT_FILENO = mock_stdout_fd mock_stdin_fd, write_to_stdin_fd = self._pipe() pty.STDIN_FILENO = mock_stdin_fd socketpair = self._socketpair() masters = [s.fileno() for s in socketpair] # Feed data. Smaller than PIPEBUF. These writes will not block. os.write(masters[1], b'from master') os.write(write_to_stdin_fd, b'from stdin') # Expect two select calls, the last one will cause IndexError pty.select = self._mock_select self.select_rfds_lengths.append(2) self.select_rfds_results.append([mock_stdin_fd, masters[0]]) self.select_rfds_lengths.append(2) with self.assertRaises(IndexError): pty._copy(masters[0]) # Test that the right data went to the right places. rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0] self.assertEqual([read_from_stdout_fd, masters[1]], rfds) self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master') self.assertEqual(os.read(masters[1], 20), b'from stdin')
def _write_stdout(self, data): assert isinstance(data, bytes), '`data` must be a bytes' os.write(pty.STDOUT_FILENO, data)
def _set_pty_size(self): ''' Sets the window size of the child pty based on the window size of our own controlling terminal. ''' assert self.master_fd is not None # Get the terminal size of the real terminal, set it on the pseudoterminal. buf = array.array('h', [0, 0, 0, 0]) fcntl.ioctl(pty.STDOUT_FILENO, termios.TIOCGWINSZ, buf, True) fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, buf)
def _set_pty_size(self): ''' Sets the window size of the child pty based on the window size of our own controlling terminal. ''' assert self.master_fd is not None # Get the terminal size of the real terminal, set it on the pseudoterminal. rows, cols = self.size buf = array.array('h', [cols, rows, 0, 0]) #fcntl.ioctl(pty.STDOUT_FILENO, termios.TIOCGWINSZ, buf, True) fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, buf)
def write_stdout(self, data): ''' Writes to stdout as if the child process had written the data. ''' #os.write(pty.STDOUT_FILENO, data)
def __fork_pty(self): '''This implements a substitute for the forkpty system call. This should be more portable than the pty.fork() function. Specifically, this should work on Solaris. Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to resolve the issue with Python's pty.fork() not supporting Solaris, particularly ssh. Based on patch to posixmodule.c authored by Noah Spurrier:: http://mail.python.org/pipermail/python-dev/2003-May/035281.html ''' parent_fd, child_fd = os.openpty() if parent_fd < 0 or child_fd < 0: raise ExceptionPexpect("Could not open with os.openpty().") pid = os.fork() if pid == pty.CHILD: # Child. os.close(parent_fd) self.__pty_make_controlling_tty(child_fd) os.dup2(child_fd, self.STDIN_FILENO) os.dup2(child_fd, self.STDOUT_FILENO) os.dup2(child_fd, self.STDERR_FILENO) else: # Parent. os.close(child_fd) return pid, parent_fd
def __interact_copy(self, escape_character=None, input_filter=None, output_filter=None): '''This is used by the interact() method. ''' while self.isalive(): r, w, e = self.__select([self.child_fd, self.STDIN_FILENO], [], []) if self.child_fd in r: try: data = self.__interact_read(self.child_fd) except OSError as err: if err.args[0] == errno.EIO: # Linux-style EOF break raise if data == b'': # BSD-style EOF break if output_filter: data = output_filter(data) if self.logfile is not None: self.logfile.write(data) self.logfile.flush() os.write(self.STDOUT_FILENO, data) if self.STDIN_FILENO in r: data = self.__interact_read(self.STDIN_FILENO) if input_filter: data = input_filter(data) i = data.rfind(escape_character) if i != -1: data = data[:i] self.__interact_writen(self.child_fd, data) break self.__interact_writen(self.child_fd, data)
def _set_pty_size(self): """Sets the window size of the child pty based on the window size of our own controlling terminal. """ assert self.master_fd is not None # Get the terminal size of the real terminal, set it on the # pseudoterminal. buf = array.array('h', [0, 0, 0, 0]) fcntl.ioctl(pty.STDOUT_FILENO, termios.TIOCGWINSZ, buf, True) fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, buf)
def write_stdout(self, data): """Writes to stdout as if the child process had written the data (bytes).""" os.write(pty.STDOUT_FILENO, data) # write to real terminal # tee to buffer data = self._sanatize_data(data) if len(data) > 0: self.buffer.write(data)