我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.dup()。
def Pipe(duplex=True): ''' Returns pair of connection objects at either end of a pipe ''' if duplex: s1, s2 = socket.socketpair() c1 = _multiprocessing.Connection(os.dup(s1.fileno())) c2 = _multiprocessing.Connection(os.dup(s2.fileno())) s1.close() s2.close() else: fd1, fd2 = os.pipe() c1 = _multiprocessing.Connection(fd1, writable=False) c2 = _multiprocessing.Connection(fd2, readable=False) return c1, c2
def __init__(self, fh, map=None, maxdata=None, ignore_broken_pipe=False, logger=None, **obsopt): """Wrap a dispatcher around the passed filehandle. If `ignore_broken_pipe` is `True`, an `EPIPE` or `EBADF` error will call `handle_close()` instead of `handle_expt()`. Useful when broken pipes should be handled quietly. `logger` is a logger which will be used to log unusual exceptions; otherwise, they will be printed to stderr. """ self.maxdata = maxdata if maxdata else self.pipe_maxdata self.__logger = logger if ignore_broken_pipe: self.__ignore_errno = [EPIPE, EBADF] else: self.__ignore_errno = [] self.__filehandle = fh # Check for overduplication of the file descriptor and close the extra fddup = os.dup(fh.fileno()) file_dispatcher.__init__(self, fddup, map=map) if (self._fileno != fddup): os.close (fddup) Observable.__init__(self, **obsopt)
def __init__(self, targetfd, tmpfile=None, now=True, patchsys=False): """ save targetfd descriptor, and open a new temporary file there. If no tmpfile is specified a tempfile.Tempfile() will be opened in text mode. """ self.targetfd = targetfd if tmpfile is None and targetfd != 0: f = tempfile.TemporaryFile('wb+') tmpfile = dupfile(f, encoding="UTF-8") f.close() self.tmpfile = tmpfile self._savefd = os.dup(self.targetfd) if patchsys: self._oldsys = getattr(sys, patchsysdict[targetfd]) if now: self.start()
def safe_text_dupfile(f, mode, default_encoding="UTF8"): """ return a open text file object that's a duplicate of f on the FD-level if possible. """ encoding = getattr(f, "encoding", None) try: fd = f.fileno() except Exception: if "b" not in getattr(f, "mode", "") and hasattr(f, "encoding"): # we seem to have a text stream, let's just use it return f else: newfd = os.dup(fd) if "b" not in mode: mode += "b" f = os.fdopen(newfd, mode, 0) # no buffering return EncodedFile(f, encoding or default_encoding)
def Pipe(duplex=True): ''' Returns pair of connection objects at either end of a pipe ''' if duplex: s1, s2 = socket.socketpair() s1.setblocking(True) s2.setblocking(True) c1 = _multiprocessing.Connection(os.dup(s1.fileno())) c2 = _multiprocessing.Connection(os.dup(s2.fileno())) s1.close() s2.close() else: fd1, fd2 = os.pipe() c1 = _multiprocessing.Connection(fd1, writable=False) c2 = _multiprocessing.Connection(fd2, readable=False) return c1, c2
def main(): if sys.platform in ['win32', 'cygwin']: return 0 # This script is called by gclient. gclient opens its hooks subprocesses with # (stdout=subprocess.PIPE, stderr=subprocess.STDOUT) and then does custom # output processing that breaks printing '\r' characters for single-line # updating status messages as printed by curl and wget. # Work around this by setting stderr of the update.sh process to stdin (!): # gclient doesn't redirect stdin, and while stdin itself is read-only, a # dup()ed sys.stdin is writable, try # fd2 = os.dup(sys.stdin.fileno()); os.write(fd2, 'hi') # TODO: Fix gclient instead, http://crbug.com/95350 return subprocess.call( [os.path.join(os.path.dirname(__file__), 'update.sh')] + sys.argv[1:], stderr=os.fdopen( os.dup( sys.stdin.fileno())))
def create_child(*args): parentfp, childfp = socket.socketpair() pid = os.fork() if not pid: mitogen.core.set_block(childfp.fileno()) os.dup2(childfp.fileno(), 0) os.dup2(childfp.fileno(), 1) childfp.close() parentfp.close() os.execvp(args[0], args) childfp.close() # Decouple the socket from the lifetime of the Python socket object. fd = os.dup(parentfp.fileno()) parentfp.close() LOG.debug('create_child() child %d fd %d, parent %d, cmd: %s', pid, fd, os.getpid(), Argv(args)) return pid, fd
def test_closerange(self): first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR) # We must allocate two consecutive file descriptors, otherwise # it will mess up other file descriptors (perhaps even the three # standard ones). second = os.dup(first) try: retries = 0 while second != first + 1: os.close(first) retries += 1 if retries > 10: # XXX test skipped self.skipTest("couldn't allocate two consecutive fds") first, second = second, os.dup(second) finally: os.close(second) # close a fd that is open, and one that isn't os.closerange(first, first + 2) self.assertRaises(OSError, os.write, first, b"a")
def _mock_output_restore(self): """ Mock methods to ensure that stdout and stderr are restored, after they have been captured. Return the path to the tempfile that was used to capture the output. """ old_stdout = os.dup(1) old_stderr = os.dup(2) fd, outfile = tempfile.mkstemp() mkstemp_patcher = mock.patch("tempfile.mkstemp") mock_mkstemp = mkstemp_patcher.start() self.addCleanup(mkstemp_patcher.stop) mock_mkstemp.return_value = (fd, outfile) dup_patcher = mock.patch("os.dup") mock_dup = dup_patcher.start() self.addCleanup(dup_patcher.stop) mock_dup.side_effect = lambda fd: {1: old_stdout, 2: old_stderr}[fd] dup2_patcher = mock.patch("os.dup2", wraps=os.dup2) mock_dup2 = dup2_patcher.start() self.addCleanup(dup2_patcher.stop) return outfile, mock_dup2
def _setup_pipe(self, name): from fcntl import fcntl, F_GETFL, F_SETFL real_fd = getattr(sys, '__%s__' % name).fileno() save_fd = os.dup(real_fd) self._save_fds[name] = save_fd pipe_out, pipe_in = os.pipe() os.dup2(pipe_in, real_fd) os.close(pipe_in) self._real_fds[name] = real_fd # make pipe_out non-blocking flags = fcntl(pipe_out, F_GETFL) fcntl(pipe_out, F_SETFL, flags | os.O_NONBLOCK) return pipe_out
def callSolver(self, lp, callback = None): """Solves the problem with yaposib """ if self.msg == 0: #close stdout to get rid of messages tempfile = open(mktemp(),'w') savestdout = os.dup(1) os.close(1) if os.dup(tempfile.fileno()) != 1: raise PulpSolverError("couldn't redirect stdout - dup() error") self.solveTime = -clock() lp.solverModel.solve(self.mip) self.solveTime += clock() if self.msg == 0: #reopen stdout os.close(1) os.dup(savestdout) os.close(savestdout)
def start_pager(): if not sys.stdout.isatty(): return pager = get_pager() if not pager: return if "LESS" not in os.environ: os.environ["LESS"] = "FRSX" oldstdout = os.dup(1) global pagerproc pagerproc = subprocess.Popen([pager], stdin=subprocess.PIPE, stdout=oldstdout, close_fds=True) os.close(oldstdout) os.dup2(pagerproc.stdin.fileno(), 1)
def stdchannel_redirected(stdchannel, dest_filename): """ A context manager to temporarily redirect stdout or stderr e.g.: with stdchannel_redirected(sys.stderr, os.devnull): if compiler.has_function('clock_gettime', libraries=['rt']): libraries.append('rt') Code adapted from https://stackoverflow.com/a/17752455/1382869 """ try: oldstdchannel = os.dup(stdchannel.fileno()) dest_file = open(dest_filename, 'w') os.dup2(dest_file.fileno(), stdchannel.fileno()) yield finally: if oldstdchannel is not None: os.dup2(oldstdchannel, stdchannel.fileno()) if dest_file is not None: dest_file.close()
def test_closerange(self): first = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR) # We must allocate two consecutive file descriptors, otherwise # it will mess up other file descriptors (perhaps even the three # standard ones). second = os.dup(first) try: retries = 0 while second != first + 1: os.close(first) retries += 1 if retries > 10: # XXX test skipped self.skipTest("couldn't allocate two consecutive fds") first, second = second, os.dup(second) finally: os.close(second) # close a fd that is open, and one that isn't os.closerange(first, first + 2) self.assertRaises(OSError, os.write, first, "a")
def __init__(self, stream, filename): self._stream = stream self._fd = stream.fileno() self._filename = filename # Keep original stream for later self._uncaptured_fd = os.dup(self._fd) # Open file to save stream to cap_fd = os.open(self._filename, os.O_CREAT | os.O_TRUNC | os.O_WRONLY, 0600) # Send stream to this file self._stream.flush() os.dup2(cap_fd, self._fd) os.close(cap_fd)
def __enter__(self): assert self._original_fds is None, 'You can only use this object once' # Save the original FDs and file objects. # FIXME: How about stdin? self._original_fds = [(fd, os.dup(fd)) for fd in (1, 2)] self._old_stdout = sys.stdout self._old_stderr = sys.stderr # Redirect the Python file objects. throwaway_fileno, self._redirected_path = tempfile.mkstemp() # We need to reopen the file or Python and non-Python will have two different # positions in the file (and we can't build a file object from a fileno). os.close(throwaway_fileno) self._redirected_file = open(self._redirected_path, 'w') sys.stdout = self._redirected_file sys.stderr = self._redirected_file # Redirect the standard FDs. os.dup2(self._redirected_file.fileno(), 1) os.dup2(self._redirected_file.fileno(), 2) return self
def __enter__(self): self.sys = sys # save previous stdout/stderr self.saved_streams = saved_streams = sys.__stdout__, sys.__stderr__ self.fds = fds = [s.fileno() for s in saved_streams] self.saved_fds = map(os.dup, fds) # flush any pending output for s in saved_streams: s.flush() # open surrogate files if self.combine: null_streams = [open(self.outfiles[0], self.mode, 0)] * 2 if self.outfiles[0] != os.devnull: # disable buffering so output is merged immediately sys.stdout, sys.stderr = map(os.fdopen, fds, ['w']*2, [0]*2) else: null_streams = [open(f, self.mode, 0) for f in self.outfiles] self.null_fds = null_fds = [s.fileno() for s in null_streams] self.null_streams = null_streams # overwrite file objects and low-level file descriptors map(os.dup2, null_fds, fds)
def __init__(self, f, mode='r', bufsize=-1): if not isinstance(f, six.string_types + (int, file)): raise TypeError('f(ile) should be int, str, unicode or file, not %r' % f) if isinstance(f, six.string_types): f = open(f, mode, 0) if isinstance(f, int): fileno = f self._name = "<fd:%d>" % fileno else: fileno = os.dup(f.fileno()) self._name = f.name if f.mode != mode: raise ValueError('file.mode %r does not match mode parameter %r' % (f.mode, mode)) self._name = f.name f.close() super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode, bufsize) set_nonblocking(self) self.softspace = 0
def __init__(self, name, mode='r', closefd=True, opener=None): if isinstance(name, int): fileno = name self._name = "<fd:%d>" % fileno else: assert isinstance(name, six.string_types) with open(name, mode) as fd: self._name = fd.name fileno = _original_os.dup(fd.fileno()) notify_opened(fileno) self._fileno = fileno self._mode = mode self._closed = False set_nonblocking(self) self._seekable = None
def test_small_errpipe_write_fd(self): """Issue #15798: Popen should work when stdio fds are available.""" new_stdin = os.dup(0) new_stdout = os.dup(1) try: os.close(0) os.close(1) # Side test: if errpipe_write fails to have its CLOEXEC # flag set this should cause the parent to think the exec # failed. Extremely unlikely: everyone supports CLOEXEC. subprocess.Popen([ sys.executable, "-c", "print('AssertionError:0:CLOEXEC failure.')"]).wait() finally: # Restore original stdin and stdout os.dup2(new_stdin, 0) os.dup2(new_stdout, 1) os.close(new_stdin) os.close(new_stdout)
def test_close_fds_after_preexec(self): fd_status = support.findfile("fd_status.py", subdir="subprocessdata") # this FD is used as dup2() target by preexec_fn, and should be closed # in the child process fd = os.dup(1) self.addCleanup(os.close, fd) p = subprocess.Popen([sys.executable, fd_status], stdout=subprocess.PIPE, close_fds=True, preexec_fn=lambda: os.dup2(1, fd)) output, ignored = p.communicate() remaining_fds = set(map(int, output.split(b','))) self.assertNotIn(fd, remaining_fds)
def get_high_socket_fd(self): if WIN32: # The child process will not have any socket handles, so # calling socket.fromfd() should produce WSAENOTSOCK even # if there is a handle of the same number. return socket.socket().detach() else: # We want to produce a socket with an fd high enough that a # freshly created child process will not have any fds as high. fd = socket.socket().detach() to_close = [] while fd < 50: to_close.append(fd) fd = os.dup(fd) for x in to_close: os.close(x) return fd
def redirect_stdout_to(stream_with_fd): if stream_with_fd is None: # no redirect when stream is 'None' yield else: # do not redirect, if streams do not have file descriptors! try: _sys_stdout_fileno = sys.stdout.fileno() _out_stream_fileno = stream_with_fd.fileno() except: yield else: # save the old stdout stream old_out_stream = os.dup(sys.stdout.fileno()) os.dup2(stream_with_fd.fileno(), sys.stdout.fileno()) # if all OK, yield back to the caller, catching exceptions try: yield finally: # restore the previous output stream os.dup2(old_out_stream, sys.stdout.fileno()) os.close(old_out_stream)
def pya(): global global_pya import pyaudio if global_pya == None: # suppress Jack and ALSA error messages on Linux. nullfd = os.open("/dev/null", 1) oerr = os.dup(2) os.dup2(nullfd, 2) global_pya = pyaudio.PyAudio() os.dup2(oerr, 2) os.close(oerr) os.close(nullfd) return global_pya # find the lowest supported input rate >= rate. # needed on Linux but not the Mac (which converts as needed).
def _run_pcap_parser(self, rx_tcpdump, interface): rx, tx = pipe() # dupe pids for the child process tcpdump, out = dup(rx_tcpdump), dup(tx) close(tx) close(rx_tcpdump) pid = fork() if pid: close(out) close(tcpdump) self.child_parser = pid self.rx = rx else: # get rid of unnecessary privileges drop_privileges() # start parsing packets start_pcap_parser(tcpdump, out, interface)
def generateHits(self, cont): """ generates the hits.csv file """ hitf = self.outdir + "/hits.csv" old = os.dup(1) sys.stdout.flush() os.close(1) os.open(hitf, os.O_WRONLY | os.O_CREAT) cont.printallHits() sys.stdout.flush() os.close(1) os.dup(old) os.close(old) lines = open(hitf).readlines() random.shuffle(lines) open(hitf, 'w').writelines(lines)
def readStdin(self): app.log.info('reading from stdin') # Create a new input stream for the file data. # Fd is short for file descriptor. os.dup and os.dup2 will duplicate file # descriptors. stdinFd = sys.stdin.fileno() newFd = os.dup(stdinFd) newStdin = open("/dev/tty") os.dup2(newStdin.fileno(), stdinFd) # Create a text buffer to read from alternate stream. textBuffer = self.newTextBuffer() try: with io.open(newFd, "r") as fileInput: textBuffer.fileFilter(fileInput.read()) except Exception as e: app.log.exception(e) app.log.info('finished reading from stdin') return textBuffer
def stdchannel_redirected(stdchannel, dest_filename): """ A context manager to temporarily redirect stdout or stderr :param stdchannel: the channel to redirect :type stdchannel: sys.std :param dest_filename: the filename to redirect output to :type dest_filename: os.devnull :return: nothing :rtype: None """ try: oldstdchannel = os.dup(stdchannel.fileno()) dest_file = open(dest_filename, 'w') os.dup2(dest_file.fileno(), stdchannel.fileno()) yield finally: if oldstdchannel is not None: os.dup2(oldstdchannel, stdchannel.fileno()) if dest_file is not None: dest_file.close()