我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.execvpe()。
def reexec(self): """\ Relaunch the master and workers. """ if self.pidfile is not None: self.pidfile.rename("%s.oldbin" % self.pidfile.fname) self.reexec_pid = os.fork() if self.reexec_pid != 0: self.master_name = "Old Master" return environ = self.cfg.env_orig.copy() fds = [l.fileno() for l in self.LISTENERS] environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds]) os.chdir(self.START_CTX['cwd']) self.cfg.pre_exec(self) # exec the process using the original environnement os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def open_terminal(command="bash", columns=80, lines=24): p_pid, master_fd = pty.fork() if p_pid == 0: # Child. path, *args = shlex.split(command) args = [path] + args env = dict(TERM="linux", LC_ALL="en_GB.UTF-8", COLUMNS=str(columns), LINES=str(lines)) try: os.execvpe(path, args, env) except FileNotFoundError: print("Could not find the executable in %s. Press any key to exit." % path) exit() # set non blocking read flag = fcntl.fcntl(master_fd, fcntl.F_GETFD) fcntl.fcntl(master_fd, fcntl.F_SETFL, flag | os.O_NONBLOCK) # File-like object for I/O with the child process aka command. p_out = os.fdopen(master_fd, "w+b", 0) return Terminal(columns, lines), p_pid, p_out
def spawn(self): env = self.env env['TERM'] = 'linux' self.pid, self.master = pty.fork() if self.pid == 0: if callable(self.command): try: try: self.command() except: sys.stderr.write(traceback.format_exc()) sys.stderr.flush() finally: os._exit(0) else: os.execvpe(self.command[0], self.command, env) if self.main_loop is None: fcntl.fcntl(self.master, fcntl.F_SETFL, os.O_NONBLOCK) atexit.register(self.terminate)
def _execChild(self, path, uid, gid, executable, args, environment): """ The exec() which is done in the forked child. """ if path: os.chdir(path) if uid is not None or gid is not None: if uid is None: uid = os.geteuid() if gid is None: gid = os.getegid() # set the UID before I actually exec the process os.setuid(0) os.setgid(0) switchUID(uid, gid) os.execvpe(executable, args, environment)
def test_executionError(self): """ Raise an error during execvpe to check error management. """ cmd = self.getCommand('false') d = defer.Deferred() p = TrivialProcessProtocol(d) def buggyexecvpe(command, args, environment): raise RuntimeError("Ouch") oldexecvpe = os.execvpe os.execvpe = buggyexecvpe try: reactor.spawnProcess(p, cmd, [b'false'], env=None, usePTY=self.usePTY) def check(ignored): errData = b"".join(p.errData + p.outData) self.assertIn(b"Upon execvpe", errData) self.assertIn(b"Ouch", errData) d.addCallback(check) finally: os.execvpe = oldexecvpe return d
def _execute(path, argv, environ): if environ is None: os.execvp(path, argv) else: os.execvpe(path, argv, environ) return # endregion
def _execChild(self, path, settingUID, uid, gid, command, args, environment): if path: os.chdir(path) # set the UID before I actually exec the process if settingUID: switchUID(uid, gid) os.execvpe(command, args, environment)
def test_execvpe_with_bad_program(self): self.assertRaises(OSError, os.execvpe, 'no such app-', ['no such app-'], None)
def test_execvpe_with_bad_arglist(self): self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
def reexec(self): """\ Relaunch the master and workers. """ if self.reexec_pid != 0: self.log.warning("USR2 signal ignored. Child exists.") return if self.master_pid != 0: self.log.warning("USR2 signal ignored. Parent exists.") return master_pid = os.getpid() self.reexec_pid = os.fork() if self.reexec_pid != 0: return self.cfg.pre_exec(self) environ = self.cfg.env_orig.copy() environ['GUNICORN_PID'] = str(master_pid) if self.systemd: environ['LISTEN_PID'] = str(os.getpid()) environ['LISTEN_FDS'] = str(len(self.LISTENERS)) else: environ['GUNICORN_FD'] = ','.join( str(l.fileno()) for l in self.LISTENERS) os.chdir(self.START_CTX['cwd']) # exec the process using the original environment os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def execvpe(self): """Convenience method exec's the command replacing the current process. Returns: Does not return. May raise an OSError though. """ args = copy(self._args) if self._shell: assert len(args) == 1 if _is_windows(): # The issue here is that in Lib/subprocess.py in # the Python distribution, if shell=True the code # jumps through some funky hoops setting flags on # the Windows API calls. We need to do that, rather # than calling os.execvpe which doesn't let us set those # flags. So we spawn the child and then exit. self.popen() sys.exit(0) else: # this is all shell=True does on unix args = ['/bin/sh', '-c'] + args try: old_dir = os.getcwd() os.chdir(self._cwd) sys.stderr.flush() sys.stdout.flush() _verbose_logger().info("$ %s", " ".join(args)) os.execvpe(args[0], args, self._env) finally: # avoid side effect if exec fails (or is mocked in tests) os.chdir(old_dir)
def ExecExternalProgram(argv, environ): """ """ # TODO: If there is an error, like the file isn't executable, then we should # exit, and the parent will reap it. Should it capture stderr? try: os.execvpe(argv[0], argv, environ) except OSError as e: log('Unexpected error in execvpe(%r, %r, ...): %s', argv[0], argv, e) # Command not found means 127. TODO: Are there other cases? sys.exit(127) # no return
def reexec(self): """\ Relaunch the master and workers. """ if self.reexec_pid != 0: self.log.warning("USR2 signal ignored. Child exists.") return if self.master_pid != 0: self.log.warning("USR2 signal ignored. Parent exists") return master_pid = os.getpid() self.reexec_pid = os.fork() if self.reexec_pid != 0: return self.cfg.pre_exec(self) environ = self.cfg.env_orig.copy() fds = [l.fileno() for l in self.LISTENERS] environ['GUNICORN_FD'] = ",".join([str(fd) for fd in fds]) environ['GUNICORN_PID'] = str(master_pid) os.chdir(self.START_CTX['cwd']) # exec the process using the original environnement os.execvpe(self.START_CTX[0], self.START_CTX['args'], environ)
def create_execve(original_name): """ os.execve(path, args, env) os.execvpe(file, args, env) """ def new_execve(path, args, env): import os send_process_created_message() return getattr(os, original_name)(path, patch_args(args), env) return new_execve
def patch_new_process_functions_with_warning(): monkey_patch_os('execl', create_warn_multiproc) monkey_patch_os('execle', create_warn_multiproc) monkey_patch_os('execlp', create_warn_multiproc) monkey_patch_os('execlpe', create_warn_multiproc) monkey_patch_os('execv', create_warn_multiproc) monkey_patch_os('execve', create_warn_multiproc) monkey_patch_os('execvp', create_warn_multiproc) monkey_patch_os('execvpe', create_warn_multiproc) monkey_patch_os('spawnl', create_warn_multiproc) monkey_patch_os('spawnle', create_warn_multiproc) monkey_patch_os('spawnlp', create_warn_multiproc) monkey_patch_os('spawnlpe', create_warn_multiproc) monkey_patch_os('spawnv', create_warn_multiproc) monkey_patch_os('spawnve', create_warn_multiproc) monkey_patch_os('spawnvp', create_warn_multiproc) monkey_patch_os('spawnvpe', create_warn_multiproc) if sys.platform != 'win32': monkey_patch_os('fork', create_warn_multiproc) try: import _posixsubprocess monkey_patch_module(_posixsubprocess, 'fork_exec', create_warn_fork_exec) except ImportError: pass else: # Windows try: import _subprocess except ImportError: import _winapi as _subprocess monkey_patch_module(_subprocess, 'CreateProcess', create_CreateProcessWarnMultiproc)
def setup_lib_paths(fbdir, libdir): """This is a little bit of a hack, but it should work. If we detect that the EDFLIB_DIR is not in LD_LIBRARY_PATH, restart after adding it. """ try: libpath = os.environ['LD_LIBRARY_PATH'] except KeyError: libpath = "" if not (sys.platform == "win32") and (libdir not in libpath): # To get the Fuzzbunch environment setup properly, we need to modify LD_LIBRARY_PATH. # To do that, we need to rerun Fuzzbunch so that it picks up the new LD_LIBRARY_PATH os.environ['LD_LIBRARY_PATH'] = "%s:%s" % (libdir,libpath) #path = os.path.abspath(fbdir) #args = ['"' + path + '"'] + sys.argv[1:] #os.execvpe( 'python', ['python']+sys.argv, os.environ)
def xexec(args, stdin=None): """Replaces current process with command specified and passes in the current xonsh environment. """ env = builtins.__xonsh_env__ denv = env.detype() if len(args) > 0: try: os.execvpe(args[0], args, denv) except FileNotFoundError as e: return 'xonsh: ' + e.args[1] + ': ' + args[0] + '\n' else: return 'xonsh: exec: no args specified\n'
def test_errorDuringExec(self): """ When L{os.execvpe} raises an exception, it will format that exception on stderr as UTF-8, regardless of system encoding information. """ def execvpe(*args, **kw): # Ensure that real traceback formatting has some non-ASCII in it, # by forcing the filename of the last frame to contain non-ASCII. filename = u"<\N{SNOWMAN}>" if not isinstance(filename, str): filename = filename.encode("utf-8") codeobj = compile("1/0", filename, "single") eval(codeobj) self.patch(os, "execvpe", execvpe) self.patch(sys, "getfilesystemencoding", lambda: "ascii") reactor = self.buildReactor() output = io.BytesIO() @reactor.callWhenRunning def whenRunning(): class TracebackCatcher(ProcessProtocol, object): errReceived = output.write def processEnded(self, reason): reactor.stop() reactor.spawnProcess(TracebackCatcher(), pyExe, [pyExe, b"-c", b""]) self.runReactor(reactor, timeout=30) self.assertIn(u"\N{SNOWMAN}".encode("utf-8"), output.getvalue())