我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用os.set_blocking()。
def __init__(self, args, **kwargs): if 'universal_newlines' in kwargs: raise RuntimeError('universal_newlines argument not supported') # If stdin has been given and it's set to a curio FileStream object, # then we need to flip it to blocking. if 'stdin' in kwargs: stdin = kwargs['stdin'] if isinstance(stdin, FileStream): # At hell's heart I stab thy coroutine attempting to read from a stream # that's been used as a pipe input to a subprocess. Must set back to # blocking or all hell breaks loose in the child. os.set_blocking(stdin.fileno(), True) self._popen = subprocess.Popen(args, **kwargs) if self._popen.stdin: self.stdin = FileStream(self._popen.stdin) if self._popen.stdout: self.stdout = FileStream(self._popen.stdout) if self._popen.stderr: self.stderr = FileStream(self._popen.stderr)
def _set_nonblocking(fd): os.set_blocking(fd, False)
def check_reopen(r1, w): try: print("Reopening read end") r2 = os.open("/proc/self/fd/{}".format(r1), os.O_RDONLY) print("r1 is {}, r2 is {}".format(r1, r2)) print("checking they both can receive from w...") os.write(w, b"a") assert os.read(r1, 1) == b"a" os.write(w, b"b") assert os.read(r2, 1) == b"b" print("...ok") print("setting r2 to non-blocking") os.set_blocking(r2, False) print("os.get_blocking(r1) ==", os.get_blocking(r1)) print("os.get_blocking(r2) ==", os.get_blocking(r2)) # Check r2 is really truly non-blocking try: os.read(r2, 1) except BlockingIOError: print("r2 definitely seems to be in non-blocking mode") # Check that r1 is really truly still in blocking mode def sleep_then_write(): time.sleep(1) os.write(w, b"c") threading.Thread(target=sleep_then_write, daemon=True).start() assert os.read(r1, 1) == b"c" print("r1 definitely seems to be in blocking mode") except Exception as exc: print("ERROR: {!r}".format(exc))
def __init__(self, fd, map=None): dispatcher.__init__(self, None, map) self.connected = True try: fd = fd.fileno() except AttributeError: pass self.set_file(fd) # set it to non-blocking mode os.set_blocking(fd, False)
def __init__(self, fileobj): assert not isinstance(fileobj, io.TextIOBase), 'Only binary mode files allowed' super().__init__(fileobj) os.set_blocking(int(self._fileno), False) # Common bound methods self._file_read = fileobj.read self._file_write = fileobj.write
def blocking(self): ''' Allow temporary access to the underlying file in blocking mode ''' if self._buffer: raise IOError('There is unread buffered data.') try: os.set_blocking(int(self._fileno), True) yield self._file finally: os.set_blocking(int(self._fileno), False)