我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.BufferedWriter()。
def backport_makefile(self, mode="r", buffering=None, encoding=None, errors=None, newline=None): """ Backport of ``socket.makefile`` from Python 3.5. """ if not set(mode) <= set(["r", "w", "b"]): raise ValueError( "invalid mode %r (only r, w, b allowed)" % (mode,) ) writing = "w" in mode reading = "r" in mode or not writing assert reading or writing binary = "b" in mode rawmode = "" if reading: rawmode += "r" if writing: rawmode += "w" raw = SocketIO(self, rawmode) self._makefile_refs += 1 if buffering is None: buffering = -1 if buffering < 0: buffering = io.DEFAULT_BUFFER_SIZE if buffering == 0: if not binary: raise ValueError("unbuffered streams must be binary") return raw if reading and writing: buffer = io.BufferedRWPair(raw, raw, buffering) elif reading: buffer = io.BufferedReader(raw, buffering) else: assert writing buffer = io.BufferedWriter(raw, buffering) if binary: return buffer text = io.TextIOWrapper(buffer, encoding, errors, newline) text.mode = mode return text
def write(self, data): """Write a byte string to the file. Returns the number of uncompressed bytes written, which is always len(data). Note that due to buffering, the file on disk may not reflect the data written until close() is called. """ with self._lock: self._check_can_write() # Convert data type if called by io.BufferedWriter. if isinstance(data, memoryview): data = data.tobytes() compressed = self._compressor.compress(data) self._fp.write(compressed) self._pos += len(data) return len(data) # Rewind the file to the beginning of the data stream.
def __enter__(self): self.writer = io.BufferedWriter(self.raw) return self
def download(self, index, subindex, data, force_segment=False): """May be called to make a write operation without an Object Dictionary. :param int index: Index of object to write. :param int subindex: Sub-index of object to write. :param bytes data: Data to be written. :param bool force_segment: Force use of segmented transfer regardless of data size. :raises canopen.SdoCommunicationError: On unexpected response or timeout. :raises canopen.SdoAbortedError: When node responds with an error. """ raw_stream = WritableStream(self, index, subindex, len(data), force_segment) fp = io.BufferedWriter(raw_stream, 7) fp.write(data) fp.close()
def __init__(self, path=None, mode='wt', use_gzip=False): self.mode = mode if path is None: self.name = "stdout" if 'b' in mode: self.fd = os.fdopen(sys.stdout.fileno(), 'wb') else: self.fd = sys.stdout else: self.name = os.path.basename(path) abspath = os.path.abspath(path) if not os.path.exists(os.path.dirname(abspath)): os.makedirs(os.path.dirname(abspath)) if use_gzip: _open = lambda p, m: io.BufferedWriter(gzip.open(p, m)) else: _open = lambda p, m: io.open(p, m) self.fd = _open(abspath, mode)
def __init__(self, stream=None): if not stream: stream = BytesIO() self.writer = BufferedWriter(stream) self.written_count = 0 # region Writing # "All numbers are written as little endian." |> Source: https://core.telegram.org/mtproto
def read(self, size): """ Reads (receives) a whole block of size bytes from the connected peer. :param size: the size of the block to be read. :return: the read data with len(data) == size. """ if self._socket is None: raise ConnectionResetError() # TODO Remove the timeout from this method, always use previous one with BufferedWriter(BytesIO(), buffer_size=size) as buffer: bytes_left = size while bytes_left != 0: try: partial = self._socket.recv(bytes_left) except socket.timeout as e: raise TimeoutError() from e except OSError as e: if e.errno == errno.EBADF or e.errno == errno.ENOTSOCK: self._raise_connection_reset() else: raise if len(partial) == 0: self._raise_connection_reset() buffer.write(partial) bytes_left -= len(partial) # If everything went fine, return the read bytes buffer.flush() return buffer.raw.getvalue()
def isfileobj(f): return isinstance(f, (io.FileIO, io.BufferedReader, io.BufferedWriter))
def makefile(self, mode="r", buffering=None, *, encoding=None, errors=None, newline=None): """makefile(...) -> an I/O stream connected to the socket The arguments are as for io.open() after the filename, except the only mode characters supported are 'r', 'w' and 'b'. The semantics are similar too. (XXX refactor to share code?) """ if not set(mode) <= {"r", "w", "b"}: raise ValueError("invalid mode %r (only r, w, b allowed)" % (mode,)) writing = "w" in mode reading = "r" in mode or not writing assert reading or writing binary = "b" in mode rawmode = "" if reading: rawmode += "r" if writing: rawmode += "w" raw = SocketIO(self, rawmode) self._io_refs += 1 if buffering is None: buffering = -1 if buffering < 0: buffering = io.DEFAULT_BUFFER_SIZE if buffering == 0: if not binary: raise ValueError("unbuffered streams must be binary") return raw if reading and writing: buffer = io.BufferedRWPair(raw, raw, buffering) elif reading: buffer = io.BufferedReader(raw, buffering) else: assert writing buffer = io.BufferedWriter(raw, buffering) if binary: return buffer text = io.TextIOWrapper(buffer, encoding, errors, newline) text.mode = mode return text
def __init__(self, popenToWrap): self._streamLock = threading.Lock() self._proc = popenToWrap self._stdout = StringIO() self._stderr = StringIO() self._stdin = StringIO() fdout = self._proc.stdout.fileno() fderr = self._proc.stderr.fileno() self._fdin = self._proc.stdin.fileno() self._closedfds = [] self._poller = select.epoll() self._poller.register(fdout, select.EPOLLIN | select.EPOLLPRI) self._poller.register(fderr, select.EPOLLIN | select.EPOLLPRI) self._poller.register(self._fdin, 0) self._fdMap = {fdout: self._stdout, fderr: self._stderr, self._fdin: self._stdin} self.stdout = io.BufferedReader(self._streamWrapper(self, self._stdout, fdout), BUFFSIZE) self.stderr = io.BufferedReader(self._streamWrapper(self, self._stderr, fderr), BUFFSIZE) self.stdin = io.BufferedWriter(self._streamWrapper(self, self._stdin, self._fdin), BUFFSIZE) self._returncode = None self.blocking = False
def open_gzip_file(filename, mode, use_system=True): """Open a gzip file, preferring the system gzip program if `use_system` is True, falling back to the gzip python library. Args: mode: The file open mode. use_system: Whether to try to use the system gzip program. """ if use_system: try: if 'r' in mode: gzfile = GzipReader(filename) else: gzfile = GzipWriter(filename) if 't' in mode: gzfile = io.TextIOWrapper(gzfile) return gzfile except: pass gzfile = gzip.open(filename, mode) if 'b' in mode: if 'r' in mode: gzfile = io.BufferedReader(gzfile) else: gzfile = io.BufferedWriter(gzfile) return gzfile