Python pty 模块,fork() 实例源码

我们从Python开源项目中,提取了以下37个代码示例,用于说明如何使用pty.fork()

项目:ave    作者:sonyxperiadev    | 项目源码 | 文件源码
def run_bg(cmd, debug=False, cwd=''):
    # make sure 'cmd' is a list of strings
    if type(cmd) in [str, unicode]:
        cmd = [c for c in cmd.split() if c != '']
    if debug:
        sys.stderr.write(' '.join(cmd)+'\n')
        sys.stderr.flush()

    try:
        ( child_pid, child_fd ) = pty.fork()
    except OSError as e:
        raise RunError(cmd, None, message='pty.fork() failed: %s' % str(e))
    if child_pid == 0:
        try:
            if cwd != '':
                os.chdir(cwd)
            os.execvp(cmd[0], cmd)
        except Exception, e:
            raise RunError(cmd, None, 'os.execvp() failed: %s' % str(e))
    else:
        return child_pid, child_fd
项目:ave    作者:sonyxperiadev    | 项目源码 | 文件源码
def run_bg(cmd, debug=False, cwd=''):
    # make sure 'cmd' is a list of strings
    if type(cmd) in [str, unicode]:
        cmd = [c for c in cmd.split() if c != '']
    if debug:
        sys.stderr.write(' '.join(cmd)+'\n')
        sys.stderr.flush()

    try:
        ( child_pid, child_fd ) = pty.fork()
    except OSError as e:
        raise RunError(cmd, None, message='pty.fork() failed: %s' % str(e))
    if child_pid == 0:
        try:
            if cwd != '':
                os.chdir(cwd)
            os.execvp(cmd[0], cmd)
        except Exception, e:
            raise RunError(cmd, None, 'os.execvp() failed: %s' % str(e))
    else:
        return child_pid, child_fd
项目:rogueinabox    作者:rogueinabox    | 项目源码 | 文件源码
def open_terminal(command="bash", columns=80, lines=24):
    p_pid, master_fd = pty.fork()
    if p_pid == 0:  # Child.
        path, *args = shlex.split(command)
        args = [path] + args
        env = dict(TERM="linux", LC_ALL="en_GB.UTF-8",
                   COLUMNS=str(columns), LINES=str(lines))
        try:
            os.execvpe(path, args, env)
        except FileNotFoundError:
            print("Could not find the executable in %s. Press any key to exit." % path)
            exit()

    # set non blocking read
    flag = fcntl.fcntl(master_fd, fcntl.F_GETFD)
    fcntl.fcntl(master_fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
    # File-like object for I/O with the child process aka command.
    p_out = os.fdopen(master_fd, "w+b", 0)
    return Terminal(columns, lines), p_pid, p_out
项目:remote-docker    作者:phizaz    | 项目源码 | 文件源码
def spawn(self, argv):
        '''
        Create a spawned process.
        Based on the code for pty.spawn().
        '''
        assert self.master_fd is None
        assert isinstance(argv, list)

        pid, master_fd = pty.fork()
        self.master_fd = master_fd

        if pid == pty.CHILD:
            os.execlp(argv[0], *argv)  # and not ever returned

        self._init()
        try:
            self._copy()  # start communication
        except Exception:
            # unexpected errors
            self._del()
            raise
        self._del()

        return self.log.decode()
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def main ():
    signal.signal (signal.SIGCHLD, signal_handler)
    pid, fd = pty.fork()
    if pid == 0:
        os.write (sys.stdout.fileno(), 'This is a test.\nThis is a test.')
        time.sleep(10000)
    nonblock (fd)
    tty.setraw(fd) #STDIN_FILENO)
    print 'Sending SIGKILL to child pid:', pid
    time.sleep(2)
    os.kill (pid, signal.SIGKILL)

    print 'Entering to sleep...'
    try:
        time.sleep(2)
    except:
        print 'Sleep interrupted'
    try:
        os.kill(pid, 0)
        print '\tChild is alive. This is ambiguous because it may be a Zombie.'
    except OSError as e:
        print '\tChild appears to be dead.'
#       print str(e)
    print
    print 'Reading from master fd:', os.read (fd, 1000)
项目:jack    作者:jack-cli-cd-ripper    | 项目源码 | 文件源码
def start_new_process(args, nice_value=0):
    "start a new process in a pty and renice it"
    data = {}
    data['start_time'] = time.time()
    pid, master_fd = pty.fork()
    if pid == CHILD:
        default_signals()
        if nice_value:
            os.nice(nice_value)
        os.execvp(args[0], [a.encode(cf['_charset'], "replace") for a in args])
    else:
        data['pid'] = pid
        if os.uname()[0] == "Linux":
            fcntl.fcntl(master_fd, F_SETFL, O_NONBLOCK)
        data['fd'] = master_fd
        data['file'] = os.fdopen(master_fd)
        data['cmd'] = args
        data['buf'] = ""
        data['otf'] = 0
        data['percent'] = 0
        data['elapsed'] = 0
        return data
项目:Adwear    作者:Uberi    | 项目源码 | 文件源码
def spawn(self):
        env = self.env
        env['TERM'] = 'linux'

        self.pid, self.master = pty.fork()

        if self.pid == 0:
            if callable(self.command):
                try:
                    try:
                        self.command()
                    except:
                        sys.stderr.write(traceback.format_exc())
                        sys.stderr.flush()
                finally:
                    os._exit(0)
            else:
                os.execvpe(self.command[0], self.command, env)

        if self.main_loop is None:
            fcntl.fcntl(self.master, fcntl.F_SETFL, os.O_NONBLOCK)

        atexit.register(self.terminate)
项目:tools    作者:InfraSIM    | 项目源码 | 文件源码
def main ():
    signal.signal (signal.SIGCHLD, signal_handler)
    pid, fd = pty.fork()
    if pid == 0:
        os.write (sys.stdout.fileno(), 'This is a test.\nThis is a test.')
        time.sleep(10000)
    nonblock (fd)
    tty.setraw(fd) #STDIN_FILENO)
    print 'Sending SIGKILL to child pid:', pid
    time.sleep(2)
    os.kill (pid, signal.SIGKILL)

    print 'Entering to sleep...'
    try:
        time.sleep(2)
    except:
        print 'Sleep interrupted'
    try:
        os.kill(pid, 0)
        print '\tChild is alive. This is ambiguous because it may be a Zombie.'
    except OSError as e:
        print '\tChild appears to be dead.'
#       print str(e)
    print
    print 'Reading from master fd:', os.read (fd, 1000)
项目:TerminalView    作者:Wramberg    | 项目源码 | 文件源码
def __init__(self, cmd, cwd):
        self._cmd_return_code = 0
        self._cmd_kill_signal = 0
        self._shell_pid, self._master_fd = pty.fork()
        if self._shell_pid == pty.CHILD:
            os.environ["TERM"] = "linux"
            os.chdir(cwd)
            os.execv(cmd[0], cmd)
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def restart(self, cmd):
        if not self.completed:
            self.process_input("\n\033[01;31mProcess killed.\033[00m\r\n")
            os.kill(self.pid, signal.SIGHUP)
            thread = self.thread
            thread.stop()
            while thread.alive:
                QCoreApplication.processEvents()

        self.exit_pipe, child_pipe = os.pipe()
        pid, fd = pty.fork()
        if pid == 0:
            try:
                os.environ["TERM"] = "xterm-256color"
                retval = subprocess.call(cmd, close_fds=True)
                os.write(2, "\033[01;34mProcess has completed.\033[00m\n")
                os.write(child_pipe, "t")
                os._exit(retval)
            except:
                pass
            os.write(2, "\033[01;31mCommand '" + cmd[0] + "' failed to execute.\033[00m\n")
            os.write(child_pipe, "f")
            os._exit(1)

        os.close(child_pipe)
        self.process_input("\033[01;34mStarted process with PID %d.\033[00m\r\n" % pid)

        self.pid = pid
        self.data_pipe = fd
        self.completed = False

        # Initialize terminal settings
        fcntl.ioctl(self.data_pipe, termios.TIOCSWINSZ, struct.pack("hhhh", self.rows, self.cols, 0, 0))
        attribute = termios.tcgetattr(self.data_pipe)
        termios.tcsetattr(self.data_pipe, termios.TCSAFLUSH, attribute)

        self.thread = TerminalUpdateThread(self, self.data_pipe, self.exit_pipe)
        self.thread.start()
项目:dataplicity-agent    作者:wildfoundry    | 项目源码 | 文件源码
def spawn(self, argv=None):
        '''
        Create a spawned process.
        Based on the code for pty.spawn().
        '''
        assert self.master_fd is None

        pid, master_fd = pty.fork()
        self.master_fd = master_fd
        self.pid = pid
        if pid == pty.CHILD:
            if self.user is not None:
                try:
                    uid = pwd.getpwnam(self.user).pw_uid
                except KeyError:
                    log.error("No such user: %s", self.user)
                else:
                    os.setuid(uid)
                    log.debug('switched user for remote process to %s(%s)', self.user, uid)
            if self.group is not None:
                try:
                    gid = grp.getgrnam(self.group).gr_gid
                except KeyError:
                    log.error("No such group: %s", self.group)
                else:
                    os.setgid(gid)
                    log.debug('switched group for remote process to %s(%s)', self.group, gid)
            if not argv:
                argv = [os.environ['SHELL']]
            os.execlp(argv[0], *argv)
            # Previous command replaces the process
            return

        self._init_fd()
        try:
            self._copy()
        except (IOError, OSError):
            pass

        os.close(master_fd)
        self.master_fd = None
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def __fork_pty(self):
        '''This implements a substitute for the forkpty system call. This
        should be more portable than the pty.fork() function. Specifically,
        this should work on Solaris.

        Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to
        resolve the issue with Python's pty.fork() not supporting Solaris,
        particularly ssh. Based on patch to posixmodule.c authored by Noah
        Spurrier::

            http://mail.python.org/pipermail/python-dev/2003-May/035281.html

        '''

        parent_fd, child_fd = os.openpty()
        if parent_fd < 0 or child_fd < 0:
            raise ExceptionPexpect("Could not open with os.openpty().")

        pid = os.fork()
        if pid == pty.CHILD:
            # Child.
            os.close(parent_fd)
            self.__pty_make_controlling_tty(child_fd)

            os.dup2(child_fd, self.STDIN_FILENO)
            os.dup2(child_fd, self.STDOUT_FILENO)
            os.dup2(child_fd, self.STDERR_FILENO)

        else:
            # Parent.
            os.close(child_fd)

        return pid, parent_fd
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def __pty_make_controlling_tty(self, tty_fd):
        '''This makes the pseudo-terminal the controlling tty. This should be
        more portable than the pty.fork() function. Specifically, this should
        work on Solaris. '''

        child_name = os.ttyname(tty_fd)

        # Disconnect from controlling tty, if any.  Raises OSError of ENXIO
        # if there was no controlling tty to begin with, such as when
        # executed by a cron(1) job.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            os.close(fd)
        except OSError as err:
            if err.errno != errno.ENXIO:
                raise

        os.setsid()

        # Verify we are disconnected from controlling tty by attempting to open
        # it again.  We expect that OSError of ENXIO should always be raised.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            os.close(fd)
            raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
        except OSError as err:
            if err.errno != errno.ENXIO:
                raise

        # Verify we can open child pty.
        fd = os.open(child_name, os.O_RDWR)
        os.close(fd)

        # Verify we now have a controlling tty.
        fd = os.open("/dev/tty", os.O_WRONLY)
        os.close(fd)
项目:obsoleted-vpduserv    作者:InfraSIM    | 项目源码 | 文件源码
def sig_test (sig_handler_type, fork_type, child_output):
    print 'Testing with:'
    print '\tsig_handler_type:', sig_handler_type
    print '\tfork_type:', fork_type
    print '\tchild_output:', child_output

    if sig_handler_type == 'SIG_IGN':
        signal.signal (signal.SIGCHLD, signal.SIG_IGN)
    else:
        signal.signal (signal.SIGCHLD, signal_handler)
    pid = -1
    fd = -1
    if fork_type == 'ptyfork':
        pid, fd = pty.fork()
    else:
        pid = os.fork()

    if pid == 0:
        if child_output == 'yes':
            os.write (sys.stdout.fileno(), 'This is a test.\nThis is a test.')
        time.sleep(10000)

    #print 'Sending SIGKILL to child pid:', pid
    time.sleep(2)
    os.kill (pid, signal.SIGKILL)

    #print 'Entering to sleep...'
    try:
        time.sleep(2)
    except:
        pass
    try:
        os.kill(pid, 0)
        print '\tChild is alive. This is ambiguous because it may be a Zombie.'
    except OSError as e:
        print '\tChild appears to be dead.'
#       print str(e)
    print
项目:ssh-tunnel    作者:aalku    | 项目源码 | 文件源码
def __fork_pty(self):
        '''This implements a substitute for the forkpty system call. This
        should be more portable than the pty.fork() function. Specifically,
        this should work on Solaris.

        Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to
        resolve the issue with Python's pty.fork() not supporting Solaris,
        particularly ssh. Based on patch to posixmodule.c authored by Noah
        Spurrier::

            http://mail.python.org/pipermail/python-dev/2003-May/035281.html

        '''

        parent_fd, child_fd = os.openpty()
        if parent_fd < 0 or child_fd < 0:
            raise ExceptionPexpect("Could not open with os.openpty().")

        pid = os.fork()
        if pid == pty.CHILD:
            # Child.
            os.close(parent_fd)
            self.__pty_make_controlling_tty(child_fd)

            os.dup2(child_fd, self.STDIN_FILENO)
            os.dup2(child_fd, self.STDOUT_FILENO)
            os.dup2(child_fd, self.STDERR_FILENO)

        else:
            # Parent.
            os.close(child_fd)

        return pid, parent_fd
项目:ssh-tunnel    作者:aalku    | 项目源码 | 文件源码
def __pty_make_controlling_tty(self, tty_fd):
        '''This makes the pseudo-terminal the controlling tty. This should be
        more portable than the pty.fork() function. Specifically, this should
        work on Solaris. '''

        child_name = os.ttyname(tty_fd)

        # Disconnect from controlling tty, if any.  Raises OSError of ENXIO
        # if there was no controlling tty to begin with, such as when
        # executed by a cron(1) job.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            os.close(fd)
        except OSError as err:
            if err.errno != errno.ENXIO:
                raise

        os.setsid()

        # Verify we are disconnected from controlling tty by attempting to open
        # it again.  We expect that OSError of ENXIO should always be raised.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            os.close(fd)
            raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
        except OSError as err:
            if err.errno != errno.ENXIO:
                raise

        # Verify we can open child pty.
        fd = os.open(child_name, os.O_RDWR)
        os.close(fd)

        # Verify we now have a controlling tty.
        fd = os.open("/dev/tty", os.O_WRONLY)
        os.close(fd)
项目:gits    作者:tolstoyevsky    | 项目源码 | 文件源码
def _create(self, rows=24, cols=80):
        pid, fd = pty.fork()
        if pid == 0:
            if os.getuid() == 0:
                cmd = ['/bin/login']
            else:
                # The prompt has to end with a newline character.
                sys.stdout.write(socket.gethostname() + ' login: \n')
                login = sys.stdin.readline().strip()

                cmd = [
                    'ssh',
                    '-oPreferredAuthentications=keyboard-interactive,password',
                    '-oNoHostAuthenticationForLocalhost=yes',
                    '-oLogLevel=FATAL',
                    '-F/dev/null',
                    '-l', login, 'localhost',
                ]

            env = {
                'COLUMNS': str(cols),
                'LINES': str(rows),
                'PATH': os.environ['PATH'],
                'TERM': 'linux',
            }
            os.execvpe(cmd[0], cmd, env)
        else:
            fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK)
            fcntl.ioctl(fd, termios.TIOCSWINSZ,
                        struct.pack('HHHH', rows, cols, 0, 0))
            TermSocketHandler.clients[fd] = {
                'client': self,
                'pid': pid,
                'terminal': Terminal(rows, cols)
            }

            return fd
项目:tools    作者:InfraSIM    | 项目源码 | 文件源码
def __fork_pty(self):
        '''This implements a substitute for the forkpty system call. This
        should be more portable than the pty.fork() function. Specifically,
        this should work on Solaris.

        Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to
        resolve the issue with Python's pty.fork() not supporting Solaris,
        particularly ssh. Based on patch to posixmodule.c authored by Noah
        Spurrier::

            http://mail.python.org/pipermail/python-dev/2003-May/035281.html

        '''

        parent_fd, child_fd = os.openpty()
        if parent_fd < 0 or child_fd < 0:
            raise ExceptionPexpect("Could not open with os.openpty().")

        pid = os.fork()
        if pid == pty.CHILD:
            # Child.
            os.close(parent_fd)
            self.__pty_make_controlling_tty(child_fd)

            os.dup2(child_fd, self.STDIN_FILENO)
            os.dup2(child_fd, self.STDOUT_FILENO)
            os.dup2(child_fd, self.STDERR_FILENO)

        else:
            # Parent.
            os.close(child_fd)

        return pid, parent_fd
项目:tools    作者:InfraSIM    | 项目源码 | 文件源码
def __pty_make_controlling_tty(self, tty_fd):
        '''This makes the pseudo-terminal the controlling tty. This should be
        more portable than the pty.fork() function. Specifically, this should
        work on Solaris. '''

        child_name = os.ttyname(tty_fd)

        # Disconnect from controlling tty, if any.  Raises OSError of ENXIO
        # if there was no controlling tty to begin with, such as when
        # executed by a cron(1) job.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            os.close(fd)
        except OSError as err:
            if err.errno != errno.ENXIO:
                raise

        os.setsid()

        # Verify we are disconnected from controlling tty by attempting to open
        # it again.  We expect that OSError of ENXIO should always be raised.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            os.close(fd)
            raise ExceptionPexpect("OSError of errno.ENXIO should be raised.")
        except OSError as err:
            if err.errno != errno.ENXIO:
                raise

        # Verify we can open child pty.
        fd = os.open(child_name, os.O_RDWR)
        os.close(fd)

        # Verify we now have a controlling tty.
        fd = os.open("/dev/tty", os.O_WRONLY)
        os.close(fd)
项目:tools    作者:InfraSIM    | 项目源码 | 文件源码
def sig_test (sig_handler_type, fork_type, child_output):
    print 'Testing with:'
    print '\tsig_handler_type:', sig_handler_type
    print '\tfork_type:', fork_type
    print '\tchild_output:', child_output

    if sig_handler_type == 'SIG_IGN':
        signal.signal (signal.SIGCHLD, signal.SIG_IGN)
    else:
        signal.signal (signal.SIGCHLD, signal_handler)
    pid = -1
    fd = -1
    if fork_type == 'ptyfork':
        pid, fd = pty.fork()
    else:
        pid = os.fork()

    if pid == 0:
        if child_output == 'yes':
            os.write (sys.stdout.fileno(), 'This is a test.\nThis is a test.')
        time.sleep(10000)

    #print 'Sending SIGKILL to child pid:', pid
    time.sleep(2)
    os.kill (pid, signal.SIGKILL)

    #print 'Entering to sleep...'
    try:
        time.sleep(2)
    except:
        pass
    try:
        os.kill(pid, 0)
        print '\tChild is alive. This is ambiguous because it may be a Zombie.'
    except OSError as e:
        print '\tChild appears to be dead.'
#       print str(e)
    print
项目:sardana    作者:sardana-org    | 项目源码 | 文件源码
def __fork_pty(self):
        """This implements a substitute for the forkpty system call. This
        should be more portable than the pty.fork() function. Specifically,
        this should work on Solaris.

        Modified 10.06.05 by Geoff Marshall: Implemented __fork_pty() method to
        resolve the issue with Python's pty.fork() not supporting Solaris,
        particularly ssh. Based on patch to posixmodule.c authored by Noah
        Spurrier::

            http://mail.python.org/pipermail/python-dev/2003-May/035281.html

        """

        parent_fd, child_fd = os.openpty()
        if parent_fd < 0 or child_fd < 0:
            raise ExceptionPexpect, "Error! Could not open pty with os.openpty()."

        pid = os.fork()
        if pid < 0:
            raise ExceptionPexpect, "Error! Failed os.fork()."
        elif pid == 0:
            # Child.
            os.close(parent_fd)
            self.__pty_make_controlling_tty(child_fd)

            os.dup2(child_fd, 0)
            os.dup2(child_fd, 1)
            os.dup2(child_fd, 2)

            if child_fd > 2:
                os.close(child_fd)
        else:
            # Parent.
            os.close(child_fd)

        return pid, parent_fd
项目:sardana    作者:sardana-org    | 项目源码 | 文件源码
def __pty_make_controlling_tty(self, tty_fd):
        """This makes the pseudo-terminal the controlling tty. This should be
        more portable than the pty.fork() function. Specifically, this should
        work on Solaris. """

        child_name = os.ttyname(tty_fd)

        # Disconnect from controlling tty if still connected.
        fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
        if fd >= 0:
            os.close(fd)

        os.setsid()

        # Verify we are disconnected from controlling tty
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            if fd >= 0:
                os.close(fd)
                raise ExceptionPexpect, "Error! We are not disconnected from a controlling tty."
        except:
            # Good! We are disconnected from a controlling tty.
            pass

        # Verify we can open child pty.
        fd = os.open(child_name, os.O_RDWR)
        if fd < 0:
            raise ExceptionPexpect, "Error! Could not open child pty, " + child_name
        else:
            os.close(fd)

        # Verify we now have a controlling tty.
        fd = os.open("/dev/tty", os.O_WRONLY)
        if fd < 0:
            raise ExceptionPexpect, "Error! Could not open controlling tty, /dev/tty"
        else:
            os.close(fd)
项目:gdebi    作者:linuxmint    | 项目源码 | 文件源码
def commit(self):
        # ui
        self.status.setText(_("Installing '%s'...") % os.path.basename(self.debfile))
        # the command
        cmd = "/usr/bin/dpkg"
        argv = [cmd, "--auto-deconfigure", "-i", self.debfile]
        (self.child_pid, self.master_fd) = pty.fork()

        if self.child_pid == 0:
            os.environ["TERM"] = "dumb"
            if not "DEBIAN_FRONTEND" in os.environ:
                os.environ["DEBIAN_FRONTEND"] = "noninteractive"
            os.environ["APT_LISTCHANGES_FRONTEND"] = "none"
            exitstatus = subprocess.call(argv)
            os._exit(exitstatus)

        while True:
            #Read from pty and write to DumbTerminal
            try:
                (rlist, wlist, xlist) = select.select([self.master_fd],[],[], 0.001)
                if len(rlist) > 0:
                    line = os.read(self.master_fd, 255)
                    self.parent.konsole.insertWithTermCodes(utf8(line))
            except Exception as e:
                #print e
                from errno import EAGAIN
                if hasattr(e, "errno") and e.errno == EAGAIN:
                    continue
                break
            KApplication.kApplication().processEvents()
        # at this point we got a read error from the pty, that most
        # likely means that the client is dead
        (pid, status) = os.waitpid(self.child_pid, 0)
        self.exitstatus = os.WEXITSTATUS(status)

        self.progress.setValue(100)
        self.parent.closeButton.setEnabled(True)
        self.parent.closeButton.setVisible(True)
        self.parent.installationProgress.setVisible(False)
        QTimer.singleShot(1, self.parent.changeSize)
项目:gdebi    作者:linuxmint    | 项目源码 | 文件源码
def fork(self):
        """pty voodoo"""
        (self.child_pid, self.master_fd) = pty.fork()
        if self.child_pid == 0:
            os.environ["TERM"] = "dumb"
            if not "DEBIAN_FRONTEND" in os.environ:
                os.environ["DEBIAN_FRONTEND"] = "noninteractive"
            os.environ["APT_LISTCHANGES_FRONTEND"] = "none"
        return self.child_pid
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def __init__(self, cmd, raw_debug = None):
        if os.name == "nt":
            raise RuntimeError, "Windows does not support pseudo-terminal devices"

        if cmd:
            # Spawn the new process in a new PTY
            self.exit_pipe, child_pipe = os.pipe()
            self.exit_callback = None

            pid, fd = pty.fork()
            if pid == 0:
                try:
                    os.environ["TERM"] = "xterm-256color"
                    if "PYTHONPATH" in os.environ:
                        del(os.environ["PYTHONPATH"])
                    if "LD_LIBRARY_PATH" in os.environ:
                        del(os.environ["LD_LIBRARY_PATH"])
                    if sys.platform == "darwin":
                        if "DYLD_LIBRARY_PATH" in os.environ:
                            del(os.environ["DYLD_LIBRARY_PATH"])

                    retval = subprocess.call(cmd, close_fds=True)
                    os.write(2, "\033[01;34mProcess has completed.\033[00m\n")
                    os.write(child_pipe, "t")
                    os._exit(retval)
                except:
                    pass
                os.write(2, "\033[01;31mCommand '" + cmd[0] + "' failed to execute.\033[00m\n")
                os.write(child_pipe, "f")
                os._exit(1)

            os.close(child_pipe)

            self.pid = pid
            self.data_pipe = fd
        else:
            self.exit_pipe = -1
            self.pid = -1
            self.data_pipe = -1

        self.rows = 25
        self.cols = 80
        self.term = TerminalEmulator(self.rows, self.cols)
        self.raw_debug = raw_debug
        self.completed = False

        self.term.response_callback = self.send_input

        if cmd:
            # Initialize terminal settings
            fcntl.ioctl(self.data_pipe, termios.TIOCSWINSZ, struct.pack("hhhh", self.rows, self.cols, 0, 0))
            attribute = termios.tcgetattr(self.data_pipe)
            termios.tcsetattr(self.data_pipe, termios.TCSAFLUSH, attribute)
        else:
            self.process_input("\033[01;33mEnter desired command arguments, then run again.\033[00m\r\n")
            self.completed = True
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def check_input_tty(self, prompt, terminal_input, stdio_encoding=None):
        if not sys.stdin.isatty() or not sys.stdout.isatty():
            self.skipTest("stdin and stdout must be ttys")
        r, w = os.pipe()
        try:
            pid, fd = pty.fork()
        except (OSError, AttributeError) as e:
            os.close(r)
            os.close(w)
            self.skipTest("pty.fork() raised {}".format(e))
        if pid == 0:
            # Child
            try:
                # Make sure we don't get stuck if there's a problem
                signal.alarm(2)
                os.close(r)
                # Check the error handlers are accounted for
                if stdio_encoding:
                    sys.stdin = io.TextIOWrapper(sys.stdin.detach(),
                                                 encoding=stdio_encoding,
                                                 errors='surrogateescape')
                    sys.stdout = io.TextIOWrapper(sys.stdout.detach(),
                                                  encoding=stdio_encoding,
                                                  errors='replace')
                with open(w, "w") as wpipe:
                    print("tty =", sys.stdin.isatty() and sys.stdout.isatty(), file=wpipe)
                    print(ascii(input(prompt)), file=wpipe)
            except:
                traceback.print_exc()
            finally:
                # We don't want to return to unittest...
                os._exit(0)
        # Parent
        os.close(w)
        os.write(fd, terminal_input + b"\r\n")
        # Get results from the pipe
        with open(r, "r") as rpipe:
            lines = []
            while True:
                line = rpipe.readline().strip()
                if line == "":
                    # The other end was closed => the child exited
                    break
                lines.append(line)
        # Check the result was got and corresponds to the user's terminal input
        if len(lines) != 2:
            # Something went wrong, try to get at stderr
            with open(fd, "r", encoding="ascii", errors="ignore") as child_output:
                self.fail("got %d lines in pipe but expected 2, child output was:\n%s"
                          % (len(lines), child_output.read()))
        os.close(fd)
        # Check we did exercise the GNU readline path
        self.assertIn(lines[0], {'tty = True', 'tty = False'})
        if lines[0] != 'tty = True':
            self.skipTest("standard IO in should have been a tty")
        input_result = eval(lines[1])   # ascii() -> eval() roundtrip
        if stdio_encoding:
            expected = terminal_input.decode(stdio_encoding, 'surrogateescape')
        else:
            expected = terminal_input.decode(sys.stdin.encoding)  # what else?
        self.assertEqual(input_result, expected)
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def check_input_tty(self, prompt, terminal_input, stdio_encoding=None):
        if not sys.stdin.isatty() or not sys.stdout.isatty():
            self.skipTest("stdin and stdout must be ttys")
        r, w = os.pipe()
        try:
            pid, fd = pty.fork()
        except (OSError, AttributeError) as e:
            os.close(r)
            os.close(w)
            self.skipTest("pty.fork() raised {0}".format(e))
        if pid == 0:
            # Child
            try:
                # Make sure we don't get stuck if there's a problem
                signal.alarm(2)
                os.close(r)
                # Check the error handlers are accounted for
                if stdio_encoding:
                    sys.stdin = io.TextIOWrapper(sys.stdin.detach(),
                                                 encoding=stdio_encoding,
                                                 errors='surrogateescape')
                    sys.stdout = io.TextIOWrapper(sys.stdout.detach(),
                                                  encoding=stdio_encoding,
                                                  errors='replace')
                with open(w, "w") as wpipe:
                    print("tty =", sys.stdin.isatty() and sys.stdout.isatty(), file=wpipe)
                    print(ascii(input(prompt)), file=wpipe)
            except:
                traceback.print_exc()
            finally:
                # We don't want to return to unittest...
                os._exit(0)
        # Parent
        os.close(w)
        os.write(fd, terminal_input + b"\r\n")
        # Get results from the pipe
        with open(r, "r") as rpipe:
            lines = []
            while True:
                line = rpipe.readline().strip()
                if line == "":
                    # The other end was closed => the child exited
                    break
                lines.append(line)
        # Check the result was got and corresponds to the user's terminal input
        if len(lines) != 2:
            # Something went wrong, try to get at stderr
            with open(fd, "r", encoding="ascii", errors="ignore") as child_output:
                self.fail("got %d lines in pipe but expected 2, child output was:\n%s"
                          % (len(lines), child_output.read()))
        os.close(fd)
        # Check we did exercise the GNU readline path
        self.assertIn(lines[0], set(['tty = True', 'tty = False']))
        if lines[0] != 'tty = True':
            self.skipTest("standard IO in should have been a tty")
        input_result = eval(lines[1])   # ascii() -> eval() roundtrip
        if stdio_encoding:
            expected = terminal_input.decode(stdio_encoding, 'surrogateescape')
        else:
            expected = terminal_input.decode(sys.stdin.encoding)  # what else?
        self.assertEqual(input_result, expected)
项目:pwndemo    作者:zh-explorer    | 项目源码 | 文件源码
def __pty_make_controlling_tty(self, tty_fd):
        '''This makes the pseudo-terminal the controlling tty. This should be
        more portable than the pty.fork() function. Specifically, this should
        work on Solaris. '''

        child_name = os.ttyname(tty_fd)

        # Disconnect from controlling tty. Harmless if not already connected.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            if fd >= 0:
                os.close(fd)
        # which exception, shouldnt' we catch explicitly .. ?
        except OSError:
            # Already disconnected. This happens if running inside cron.
            pass

        os.setsid()

        # Verify we are disconnected from controlling tty
        # by attempting to open it again.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            if fd >= 0:
                os.close(fd)
                raise Exception('Failed to disconnect from ' +
                    'controlling tty. It is still possible to open /dev/tty.')
        # which exception, shouldnt' we catch explicitly .. ?
        except OSError:
            # Good! We are disconnected from a controlling tty.
            pass

        # Verify we can open child pty.
        fd = os.open(child_name, os.O_RDWR)
        if fd < 0:
            raise Exception("Could not open child pty, " + child_name)
        else:
            os.close(fd)

        # Verify we now have a controlling tty.
        fd = os.open("/dev/tty", os.O_WRONLY)
        if fd < 0:
            raise Exception("Could not open controlling tty, /dev/tty")
        else:
            os.close(fd)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def check_input_tty(self, prompt, terminal_input, stdio_encoding=None):
        if not sys.stdin.isatty() or not sys.stdout.isatty():
            self.skipTest("stdin and stdout must be ttys")
        r, w = os.pipe()
        try:
            pid, fd = pty.fork()
        except (OSError, AttributeError) as e:
            os.close(r)
            os.close(w)
            self.skipTest("pty.fork() raised {}".format(e))
        if pid == 0:
            # Child
            try:
                # Make sure we don't get stuck if there's a problem
                signal.alarm(2)
                os.close(r)
                # Check the error handlers are accounted for
                if stdio_encoding:
                    sys.stdin = io.TextIOWrapper(sys.stdin.detach(),
                                                 encoding=stdio_encoding,
                                                 errors='surrogateescape')
                    sys.stdout = io.TextIOWrapper(sys.stdout.detach(),
                                                  encoding=stdio_encoding,
                                                  errors='replace')
                with open(w, "w") as wpipe:
                    print("tty =", sys.stdin.isatty() and sys.stdout.isatty(), file=wpipe)
                    print(ascii(input(prompt)), file=wpipe)
            except:
                traceback.print_exc()
            finally:
                # We don't want to return to unittest...
                os._exit(0)
        # Parent
        os.close(w)
        os.write(fd, terminal_input + b"\r\n")
        # Get results from the pipe
        with open(r, "r") as rpipe:
            lines = []
            while True:
                line = rpipe.readline().strip()
                if line == "":
                    # The other end was closed => the child exited
                    break
                lines.append(line)
        # Check the result was got and corresponds to the user's terminal input
        if len(lines) != 2:
            # Something went wrong, try to get at stderr
            with open(fd, "r", encoding="ascii", errors="ignore") as child_output:
                self.fail("got %d lines in pipe but expected 2, child output was:\n%s"
                          % (len(lines), child_output.read()))
        os.close(fd)
        # Check we did exercise the GNU readline path
        self.assertIn(lines[0], {'tty = True', 'tty = False'})
        if lines[0] != 'tty = True':
            self.skipTest("standard IO in should have been a tty")
        input_result = eval(lines[1])   # ascii() -> eval() roundtrip
        if stdio_encoding:
            expected = terminal_input.decode(stdio_encoding, 'surrogateescape')
        else:
            expected = terminal_input.decode(sys.stdin.encoding)  # what else?
        self.assertEqual(input_result, expected)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def run_child(self, child, terminal_input):
        r, w = os.pipe()  # Pipe test results from child back to parent
        try:
            pid, fd = pty.fork()
        except (OSError, AttributeError) as e:
            os.close(r)
            os.close(w)
            self.skipTest("pty.fork() raised {}".format(e))
            raise
        if pid == 0:
            # Child
            try:
                # Make sure we don't get stuck if there's a problem
                signal.alarm(2)
                os.close(r)
                with open(w, "w") as wpipe:
                    child(wpipe)
            except:
                traceback.print_exc()
            finally:
                # We don't want to return to unittest...
                os._exit(0)
        # Parent
        os.close(w)
        os.write(fd, terminal_input)
        # Get results from the pipe
        with open(r, "r") as rpipe:
            lines = []
            while True:
                line = rpipe.readline().strip()
                if line == "":
                    # The other end was closed => the child exited
                    break
                lines.append(line)
        # Check the result was got and corresponds to the user's terminal input
        if len(lines) != 2:
            # Something went wrong, try to get at stderr
            # Beware of Linux raising EIO when the slave is closed
            child_output = bytearray()
            while True:
                try:
                    chunk = os.read(fd, 3000)
                except OSError:  # Assume EIO
                    break
                if not chunk:
                    break
                child_output.extend(chunk)
            os.close(fd)
            child_output = child_output.decode("ascii", "ignore")
            self.fail("got %d lines in pipe but expected 2, child output was:\n%s"
                      % (len(lines), child_output))
        os.close(fd)
        return lines
项目:EmPyre    作者:EmpireProject    | 项目源码 | 文件源码
def generate(self):
        login = self.options['Login']['Value']
        password = self.options['Password']['Value']
        listenerName = self.options['Listener']['Value']
        userAgent = self.options['UserAgent']['Value']
        littleSnitch = self.options['LittleSnitch']['Value']

        isEmpire = self.mainMenu.listeners.is_listener_empyre(listenerName)
        if not isEmpire:
            print helpers.color("[!] EmPyre listener required!")
            return ""

        # generate the launcher code
        launcher = self.mainMenu.stagers.generate_launcher(listenerName, userAgent=userAgent,  littlesnitch=littleSnitch)
        launcher = launcher.replace("'", "\\'")
        launcher = launcher.replace('"', '\\"')
        if launcher == "":
            print helpers.color("[!] Error in launcher command generation.")
            return ""
        script = """
import os
import pty

def wall(host, pw):
    import os,pty
    pid, fd = pty.fork()
    if pid == 0:
        os.execvp('ssh', ['ssh', '-o StrictHostKeyChecking=no', host, '%s'])
        os._exit(1)

    os.read(fd, 1024)
    os.write(fd, '\\n' + pw + '\\n')

    result = []
    while True:
        try:
            data = os.read(fd, 1024)
            if data == "Password:":
                os.write(fd, pw + '\\n')

        except OSError:
            break
        if not data:
            break
        result.append(data)
    pid, status = os.waitpid(pid, 0)
    return status, ''.join(result)

status, output = wall('%s','%s')
print status
print output

""" % (launcher, login, password)
        return script
项目:EmPyre    作者:EmpireProject    | 项目源码 | 文件源码
def generate(self):
        login = self.options['Login']['Value']
        password = self.options['Password']['Value']
        command = self.options['Command']['Value']

        # generate the launcher code


        script = """

import os
import pty

def wall(host, pw):
    import os,pty
    pid, fd = pty.fork()
    if pid == 0: # Child
        os.execvp('ssh', ['ssh', '-o StrictHostKeyChecking=no', host, '%s'])
        os._exit(1) # fail to execv

    # read '..... password:', write password
    os.read(fd, 1024)
    os.write(fd, '\\n' + pw + '\\n')

    result = []
    while True:
        try:
            data = os.read(fd, 1024)
            if data == "Password:":
                os.write(fd, pw + '\\n')

        except OSError:
            break
        if not data:
            break
        result.append(data)
    pid, status = os.waitpid(pid, 0)
    return status, ''.join(result)

status, output = wall('%s','%s')
print status
print output

""" % (command, login, password)
        return script
项目:janus    作者:nks5295    | 项目源码 | 文件源码
def open(self, command, env={}):
        """ Create subprocess using forkpty() """

        # parse command
        command_arr = shlex.split(command)
        executable = command_arr[0]
        args = command_arr

        # try to fork a new pty
        try:
            self.pid, self.fd = pty.fork()

        except:

            return False

        # child proc, replace with command after altering terminal attributes
        if self.pid == 0:

            # set requested environment variables
            for k in env.keys():
                os.environ[k] = env[k]

            # set tty attributes
            try:
                attrs = tty.tcgetattr(1)
                attrs[0] = attrs[0] ^ tty.IGNBRK
                attrs[0] = attrs[0] | tty.BRKINT | tty.IXANY | tty.IMAXBEL
                attrs[2] = attrs[2] | tty.HUPCL
                attrs[3] = attrs[3] | tty.ICANON | tty.ECHO | tty.ISIG | tty.ECHOKE
                attrs[6][tty.VMIN] = 1
                attrs[6][tty.VTIME] = 0
                tty.tcsetattr(1, tty.TCSANOW, attrs)
            except:

                pass

            # replace this process with the subprocess
            os.execvp(executable, args)

        # else master, do nothing
        else:
            pass
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def check_input_tty(self, prompt, terminal_input, stdio_encoding=None):
        if not sys.stdin.isatty() or not sys.stdout.isatty():
            self.skipTest("stdin and stdout must be ttys")
        r, w = os.pipe()
        try:
            pid, fd = pty.fork()
        except (OSError, AttributeError) as e:
            os.close(r)
            os.close(w)
            self.skipTest("pty.fork() raised {}".format(e))
        if pid == 0:
            # Child
            try:
                # Make sure we don't get stuck if there's a problem
                signal.alarm(2)
                os.close(r)
                # Check the error handlers are accounted for
                if stdio_encoding:
                    sys.stdin = io.TextIOWrapper(sys.stdin.detach(),
                                                 encoding=stdio_encoding,
                                                 errors='surrogateescape')
                    sys.stdout = io.TextIOWrapper(sys.stdout.detach(),
                                                  encoding=stdio_encoding,
                                                  errors='replace')
                with open(w, "w") as wpipe:
                    print("tty =", sys.stdin.isatty() and sys.stdout.isatty(), file=wpipe)
                    print(ascii(input(prompt)), file=wpipe)
            except:
                traceback.print_exc()
            finally:
                # We don't want to return to unittest...
                os._exit(0)
        # Parent
        os.close(w)
        os.write(fd, terminal_input + b"\r\n")
        # Get results from the pipe
        with open(r, "r") as rpipe:
            lines = []
            while True:
                line = rpipe.readline().strip()
                if line == "":
                    # The other end was closed => the child exited
                    break
                lines.append(line)
        # Check the result was got and corresponds to the user's terminal input
        if len(lines) != 2:
            # Something went wrong, try to get at stderr
            with open(fd, "r", encoding="ascii", errors="ignore") as child_output:
                self.fail("got %d lines in pipe but expected 2, child output was:\n%s"
                          % (len(lines), child_output.read()))
        os.close(fd)
        # Check we did exercise the GNU readline path
        self.assertIn(lines[0], {'tty = True', 'tty = False'})
        if lines[0] != 'tty = True':
            self.skipTest("standard IO in should have been a tty")
        input_result = eval(lines[1])   # ascii() -> eval() roundtrip
        if stdio_encoding:
            expected = terminal_input.decode(stdio_encoding, 'surrogateescape')
        else:
            expected = terminal_input.decode(sys.stdin.encoding)  # what else?
        self.assertEqual(input_result, expected)
项目:black_zone    作者:zh-explorer    | 项目源码 | 文件源码
def __pty_make_controlling_tty(self, tty_fd):
        '''This makes the pseudo-terminal the controlling tty. This should be
        more portable than the pty.fork() function. Specifically, this should
        work on Solaris. '''

        child_name = os.ttyname(tty_fd)

        # Disconnect from controlling tty. Harmless if not already connected.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            if fd >= 0:
                os.close(fd)
        # which exception, shouldnt' we catch explicitly .. ?
        except OSError:
            # Already disconnected. This happens if running inside cron.
            pass

        os.setsid()

        # Verify we are disconnected from controlling tty
        # by attempting to open it again.
        try:
            fd = os.open("/dev/tty", os.O_RDWR | os.O_NOCTTY)
            if fd >= 0:
                os.close(fd)
                raise Exception('Failed to disconnect from ' +
                    'controlling tty. It is still possible to open /dev/tty.')
        # which exception, shouldnt' we catch explicitly .. ?
        except OSError:
            # Good! We are disconnected from a controlling tty.
            pass

        # Verify we can open child pty.
        fd = os.open(child_name, os.O_RDWR)
        if fd < 0:
            raise Exception("Could not open child pty, " + child_name)
        else:
            os.close(fd)

        # Verify we now have a controlling tty.
        fd = os.open("/dev/tty", os.O_WRONLY)
        if fd < 0:
            raise Exception("Could not open controlling tty, /dev/tty")
        else:
            os.close(fd)
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def ptyPopen(args, executable=None, env=None, shell=False):
                """Less featureful but inspired by subprocess.Popen.
                Runs subprocess in a pty"""
                #
                # Note: In theory the right answer here is to subclass Popen,
                # but we found that in practice we'd have to reimplement most
                # of that class, because its handling of file descriptors is
                # too brittle in its _execute_child() code.
                #
                def __drain(masterf, outlist):
                        # Use a list as a way to pass by reference
                        while True:
                                chunksz = 1024
                                termdata = masterf.read(chunksz)
                                outlist.append(termdata)
                                if len(termdata) < chunksz:
                                        # assume we hit EOF
                                        break

                # This is the arg handling protocol from Popen
                if isinstance(args, six.string_types):
                        args = [args]
                else:
                        args = list(args)

                if shell:
                        args = ["/bin/sh", "-c"] + args
                        if executable:
                                args[0] = executable

                if executable is None:
                        executable = args[0]

                pid,fd = pty.fork()
                if pid == 0:
                        try:
                                # Child
                                if env is None:
                                        os.execvp(executable, args)
                                else:
                                        os.execvpe(executable, args, env)
                        except:
                                traceback.print_exc()
                                os._exit(99)
                else:
                        masterf = os.fdopen(fd, "rb")
                        outlist = []
                        t = threading.Thread(target=__drain,
                            args=(masterf, outlist))
                        t.start()
                        waitedpid, retcode = os.waitpid(pid, 0)
                        retcode = retcode >> 8
                        t.join()
                return retcode, b"".join(outlist)
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def fakeroot_create():

        test_root = os.path.join(g_tempdir, "ips.test.{0:d}".format(os.getpid()))
        fakeroot = os.path.join(test_root, "fakeroot")
        cmd_path = os.path.join(fakeroot, "pkg")

        try:
                os.stat(cmd_path)
        except OSError as e:
                pass
        else:
                # fakeroot already exists
                raise RuntimeError("The fakeroot shouldn't already exist.\n"
                    "Path is:{0}".format(cmd_path))

        # when creating the fakeroot we want to make sure pkg doesn't
        # touch the real root.
        env_sanitize(cmd_path)

        #
        # When accessing images via the pkg apis those apis will try
        # to access the image containing the command from which the
        # apis were invoked.  Normally when running the test suite the
        # command is run.py in a developers workspace, and that
        # workspace lives in the root image.  But accessing the root
        # image during a test suite run is verboten.  Hence, here we
        # create a temporary image from which we can run the pkg
        # command.
        #

        # create directories
        mkdir_eexist_ok(test_root)
        mkdir_eexist_ok(fakeroot)

        debug("fakeroot image create {0}".format(fakeroot))
        progtrack = pkg.client.progress.NullProgressTracker()
        api_inst = pkg.client.api.image_create(PKG_CLIENT_NAME,
            CLIENT_API_VERSION, fakeroot,
            pkg.client.api.IMG_TYPE_ENTIRE, False,
            progtrack=progtrack, cmdpath=cmd_path)

        #
        # put a copy of the pkg command in our fake root directory.
        # we do this because when recursive linked image commands are
        # run, the pkg interfaces may fork and exec additional copies
        # of pkg(1), and in this case we want them to run the copy of
        # pkg from the fake root.
        #
        fakeroot_cmdpath = os.path.join(fakeroot, "pkg")
        shutil.copy(os.path.join(g_pkg_path, "usr", "bin", "pkg"),
            fakeroot_cmdpath)

        return fakeroot, fakeroot_cmdpath