我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用os.set_inheritable()。
def test_pass_fds_inheritable(self): script = support.findfile("fd_status.py", subdir="subprocessdata") inheritable, non_inheritable = os.pipe() self.addCleanup(os.close, inheritable) self.addCleanup(os.close, non_inheritable) os.set_inheritable(inheritable, True) os.set_inheritable(non_inheritable, False) pass_fds = (inheritable, non_inheritable) args = [sys.executable, script] args += list(map(str, pass_fds)) p = subprocess.Popen(args, stdout=subprocess.PIPE, close_fds=True, pass_fds=pass_fds) output, ignored = p.communicate() fds = set(map(int, output.split(b','))) # the inheritable file descriptor must be inherited, so its inheritable # flag must be set in the child process after fork() and before exec() self.assertEqual(fds, set(pass_fds), "output=%a" % output) # inheritable flag must not be changed in the parent process self.assertEqual(os.get_inheritable(inheritable), True) self.assertEqual(os.get_inheritable(non_inheritable), False)
def sendfile(self, file, offset=0, count=None): """sendfile(file[, offset[, count]]) -> sent Send a file until EOF is reached by using high-performance os.sendfile() and return the total number of bytes which were sent. *file* must be a regular file object opened in binary mode. If os.sendfile() is not available (e.g. Windows) or file is not a regular file socket.send() will be used instead. *offset* tells from where to start reading the file. If specified, *count* is the total number of bytes to transmit as opposed to sending the file until EOF is reached. File position is updated on return or also in case of error in which case file.tell() can be used to figure out the number of bytes which were sent. The socket must be of SOCK_STREAM type. Non-blocking sockets are not supported. .. versionadded:: 1.1rc4 Added in Python 3.5, but available under all Python 3 versions in gevent. """ return self._sendfile_use_send(file, offset, count) # get/set_inheritable new in 3.4
def set_inheritable(fd, inheritable): flags = fcntl.fcntl(fd, fcntl.F_GETFD) if inheritable: newflags = flags & ~fcntl.FD_CLOEXEC else: newflags = flags | fcntl.FD_CLOEXEC fcntl.fcntl(fd, fcntl.F_SETFD, newflags) # ...or shutil.which
def _close_fds(self, keep): # `keep` is a set of fds, so we # use os.closerange from 3 to min(keep) # and then from max(keep + 1) to MAXFD and # loop through filling in the gaps. # Under new python versions, we need to explicitly set # passed fds to be inheritable or they will go away on exec if hasattr(os, 'set_inheritable'): set_inheritable = os.set_inheritable else: set_inheritable = lambda i, v: True if hasattr(os, 'closerange'): keep = sorted(keep) min_keep = min(keep) max_keep = max(keep) os.closerange(3, min_keep) os.closerange(max_keep + 1, MAXFD) for i in xrange(min_keep, max_keep): if i in keep: set_inheritable(i, True) continue try: os.close(i) except: pass else: for i in xrange(3, MAXFD): if i in keep: set_inheritable(i, True) continue try: os.close(i) except: pass
def set_inheritable(self, inheritable): os.set_handle_inheritable(self.fileno(), inheritable)
def set_inheritable(self, inheritable): os.set_inheritable(self.fileno(), inheritable)
def __init__(self, desc_cb, ws): self.ws = ws self.pipe_r_sd, self.pipe_w_sd = os.pipe() os.set_inheritable(self.pipe_w_sd, True) self.desc_cb = desc_cb self.processed = -1 self.do_stop = False process = threading.Thread(target=self.process_loop) process.start()
def set_inheritable(fd, inheritable): flags = fcntl.fcntl(fd, fcntl.F_GETFD) if inheritable: flags &= ~fcntl.FD_CLOEXEC else: flags |= fcntl.FD_CLOEXEC fcntl.fcntl(fd, fcntl.F_SETFD, flags)
def to_subprocess(self): if MS_WINDOWS: set_handle_inheritable(self._handle, True) arg = self._handle else: set_inheritable(self._fd, True) arg = self._fd return str(arg)
def create_pipe(): rfd, wfd = os.pipe() # On Windows, os.pipe() creates non-inheritable handles if not MS_WINDOWS: set_inheritable(rfd, False) set_inheritable(wfd, False) rpipe = ReadPipe(rfd) wpipe = WritePipe(wfd) return (rpipe, wpipe)
def test_get_set_inheritable(self): fd = os.open(__file__, os.O_RDONLY) self.addCleanup(os.close, fd) self.assertEqual(os.get_inheritable(fd), False) os.set_inheritable(fd, True) self.assertEqual(os.get_inheritable(fd), True)
def test_set_inheritable_cloexec(self): fd = os.open(__file__, os.O_RDONLY) self.addCleanup(os.close, fd) self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, fcntl.FD_CLOEXEC) os.set_inheritable(fd, True) self.assertEqual(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC, 0)
def test_pass_fds(self): fd_status = support.findfile("fd_status.py", subdir="subprocessdata") open_fds = set() for x in range(5): fds = os.pipe() self.addCleanup(os.close, fds[0]) self.addCleanup(os.close, fds[1]) os.set_inheritable(fds[0], True) os.set_inheritable(fds[1], True) open_fds.update(fds) for fd in open_fds: p = subprocess.Popen([sys.executable, fd_status], stdout=subprocess.PIPE, close_fds=True, pass_fds=(fd, )) output, ignored = p.communicate() remaining_fds = set(map(int, output.split(b','))) to_be_closed = open_fds - {fd} self.assertIn(fd, remaining_fds, "fd to be passed not passed") self.assertFalse(remaining_fds & to_be_closed, "fd to be closed passed") # pass_fds overrides close_fds with a warning. with self.assertWarns(RuntimeWarning) as context: self.assertFalse(subprocess.call( [sys.executable, "-c", "import sys; sys.exit(0)"], close_fds=False, pass_fds=(fd, ))) self.assertIn('overriding close_fds', str(context.warning))
def test_close_fds(self): fd_status = support.findfile("fd_status.py", subdir="subprocessdata") fds = os.pipe() self.addCleanup(os.close, fds[0]) self.addCleanup(os.close, fds[1]) open_fds = set(fds) # add a bunch more fds for _ in range(9): fd = os.open(os.devnull, os.O_RDONLY) self.addCleanup(os.close, fd) open_fds.add(fd) for fd in open_fds: os.set_inheritable(fd, True) p = subprocess.Popen([sys.executable, fd_status], stdout=subprocess.PIPE, close_fds=False) output, ignored = p.communicate() remaining_fds = set(map(int, output.split(b','))) self.assertEqual(remaining_fds & open_fds, open_fds, "Some fds were closed") p = subprocess.Popen([sys.executable, fd_status], stdout=subprocess.PIPE, close_fds=True) output, ignored = p.communicate() remaining_fds = set(map(int, output.split(b','))) self.assertFalse(remaining_fds & open_fds, "Some fds were left open") self.assertIn(1, remaining_fds, "Subprocess failed") # Keep some of the fd's we opened open in the subprocess. # This tests _posixsubprocess.c's proper handling of fds_to_keep. fds_to_keep = set(open_fds.pop() for _ in range(8)) p = subprocess.Popen([sys.executable, fd_status], stdout=subprocess.PIPE, close_fds=True, pass_fds=()) output, ignored = p.communicate() remaining_fds = set(map(int, output.split(b','))) self.assertFalse(remaining_fds & fds_to_keep & open_fds, "Some fds not in pass_fds were left open") self.assertIn(1, remaining_fds, "Subprocess failed")
def test_close_fds(self): fd_status = support.findfile("fd_status.py", subdir="subprocessdata") fds = os.pipe() self.addCleanup(os.close, fds[0]) self.addCleanup(os.close, fds[1]) open_fds = set(fds) # add a bunch more fds for _ in range(9): fd = os.open("/dev/null", os.O_RDONLY) self.addCleanup(os.close, fd) open_fds.add(fd) for fd in open_fds: os.set_inheritable(fd, True) p = subprocess.Popen([sys.executable, fd_status], stdout=subprocess.PIPE, close_fds=False) output, ignored = p.communicate() remaining_fds = set(map(int, output.split(b','))) self.assertEqual(remaining_fds & open_fds, open_fds, "Some fds were closed") p = subprocess.Popen([sys.executable, fd_status], stdout=subprocess.PIPE, close_fds=True) output, ignored = p.communicate() remaining_fds = set(map(int, output.split(b','))) self.assertFalse(remaining_fds & open_fds, "Some fds were left open") self.assertIn(1, remaining_fds, "Subprocess failed") # Keep some of the fd's we opened open in the subprocess. # This tests _posixsubprocess.c's proper handling of fds_to_keep. fds_to_keep = set(open_fds.pop() for _ in range(8)) p = subprocess.Popen([sys.executable, fd_status], stdout=subprocess.PIPE, close_fds=True, pass_fds=()) output, ignored = p.communicate() remaining_fds = set(map(int, output.split(b','))) self.assertFalse(remaining_fds & fds_to_keep & open_fds, "Some fds not in pass_fds were left open") self.assertIn(1, remaining_fds, "Subprocess failed")
def fork_exec(cmd, exec_env=None, logfile=None, pass_fds=None): """Execute a command using fork/exec. This is needed for programs system executions that need path searching but cannot have a shell as their parent process, for example: glare. When glare starts it sets itself as the parent process for its own process group. Thus the pid that a Popen process would have is not the right pid to use for killing the process group. This patch gives the test env direct access to the actual pid. :param cmd: Command to execute as an array of arguments. :param exec_env: A dictionary representing the environment with which to run the command. :param logfile: A path to a file which will hold the stdout/err of the child process. :param pass_fds: Sequence of file descriptors passed to the child. """ env = os.environ.copy() if exec_env is not None: for env_name, env_val in exec_env.items(): if callable(env_val): env[env_name] = env_val(env.get(env_name)) else: env[env_name] = env_val pid = os.fork() if pid == 0: if logfile: fds = [1, 2] with open(logfile, 'r+b') as fptr: for desc in fds: # close fds try: os.dup2(fptr.fileno(), desc) except OSError: pass if pass_fds and hasattr(os, 'set_inheritable'): # os.set_inheritable() is only available and needed # since Python 3.4. On Python 3.3 and older, file descriptors are # inheritable by default. for fd in pass_fds: os.set_inheritable(fd, True) args = shlex.split(cmd) os.execvpe(args[0], args, env) else: return pid