我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.DEFAULT_BUFFER_SIZE。
def backport_makefile(self, mode="r", buffering=None, encoding=None, errors=None, newline=None): """ Backport of ``socket.makefile`` from Python 3.5. """ if not set(mode) <= set(["r", "w", "b"]): raise ValueError( "invalid mode %r (only r, w, b allowed)" % (mode,) ) writing = "w" in mode reading = "r" in mode or not writing assert reading or writing binary = "b" in mode rawmode = "" if reading: rawmode += "r" if writing: rawmode += "w" raw = SocketIO(self, rawmode) self._makefile_refs += 1 if buffering is None: buffering = -1 if buffering < 0: buffering = io.DEFAULT_BUFFER_SIZE if buffering == 0: if not binary: raise ValueError("unbuffered streams must be binary") return raw if reading and writing: buffer = io.BufferedRWPair(raw, raw, buffering) elif reading: buffer = io.BufferedReader(raw, buffering) else: assert writing buffer = io.BufferedWriter(raw, buffering) if binary: return buffer text = io.TextIOWrapper(buffer, encoding, errors, newline) text.mode = mode return text
def __init__(self, reader, writer, buffer_size=DEFAULT_BUFFER_SIZE, max_buffer_size=None): """Constructor. The arguments are two RawIO instances. """ if max_buffer_size is not None: warnings.warn("max_buffer_size is deprecated", DeprecationWarning, 2) if not reader.readable(): raise IOError('"reader" argument must be readable.') if not writer.writable(): raise IOError('"writer" argument must be writable.') self.reader = BufferedReader(reader, buffer_size) self.writer = BufferedWriter(writer, buffer_size)
def readlines(self, hint=-1): # type: (int) -> List[bytes] """Read and return a list of lines from the stream. hint can be specified to control the number of lines read: no more lines will be read if the total size (in bytes/ characters) of all lines so far exceeds hint. :type hint: int :returns: Lines of data :rtype: list of bytes """ lines = [] for line in self: # type: ignore lines.append(line) if hint > 0 and len(lines) * io.DEFAULT_BUFFER_SIZE > hint: break return lines
def test_file_position_after_fromfile(self): # gh-4118 sizes = [io.DEFAULT_BUFFER_SIZE//8, io.DEFAULT_BUFFER_SIZE, io.DEFAULT_BUFFER_SIZE*8] for size in sizes: f = open(self.filename, 'wb') f.seek(size-1) f.write(b'\0') f.close() for mode in ['rb', 'r+b']: err_msg = "%d %s" % (size, mode) f = open(self.filename, mode) f.read(2) np.fromfile(f, dtype=np.float64, count=1) pos = f.tell() f.close() assert_equal(pos, 10, err_msg=err_msg)
def get_hash(self, x): # TODO: cached for strings and paths. # Need to generalize the reading of: # 1) file paths # 2) small strings # 3) python_objects if isinstance(x, (dict, bytes)) or not os.path.isfile(x): # use function cache. return self._get_hash_from_hashable(self._transform_to_hashable(x)) # For files. hash_obj = self.hash_function() iter_of_bytes = open(x, 'rb') try: data = iter_of_bytes.read(io.DEFAULT_BUFFER_SIZE) while data: hash_obj.update(data) data = iter_of_bytes.read(io.DEFAULT_BUFFER_SIZE) finally: iter_of_bytes.close() return hash_obj.digest()
def copy_file_data(src_file, dst_file, chunk_size=None): """Copy data from one file object to another. Arguments: src_file (io.IOBase): File open for reading. dst_file (io.IOBase): File open for writing. chunk_size (int, optional): Number of bytes to copy at a time (or `None` to use sensible default). """ chunk_size = chunk_size or io.DEFAULT_BUFFER_SIZE read = src_file.read write = dst_file.write # The 'or None' is so that it works with binary and text files for chunk in iter(lambda: read(chunk_size) or None, None): write(chunk)
def read_chunks(file, size=io.DEFAULT_BUFFER_SIZE): """Yield pieces of data from a file-like object until EOF.""" while True: chunk = file.read(size) if not chunk: break yield chunk
def readall(self): """Read until EOF, using multiple read() call.""" res = bytearray() while True: data = self.read(DEFAULT_BUFFER_SIZE) if not data: break res += data if res: return bytes(res) else: # b'' or None return data
def __init__(self, raw, buffer_size=DEFAULT_BUFFER_SIZE): """Create a new buffered reader using the given readable raw IO object. """ if not raw.readable(): raise IOError('"raw" argument must be readable.') _BufferedIOMixin.__init__(self, raw) if buffer_size <= 0: raise ValueError("invalid buffer size") self.buffer_size = buffer_size self._reset_read_buf() self._read_lock = Lock()
def __init__(self, raw, buffer_size=DEFAULT_BUFFER_SIZE, max_buffer_size=None): if not raw.writable(): raise IOError('"raw" argument must be writable.') _BufferedIOMixin.__init__(self, raw) if buffer_size <= 0: raise ValueError("invalid buffer size") if max_buffer_size is not None: warnings.warn("max_buffer_size is deprecated", DeprecationWarning, self._warning_stack_offset) self.buffer_size = buffer_size self._write_buf = bytearray() self._write_lock = Lock()
def __init__(self, raw, buffer_size=DEFAULT_BUFFER_SIZE, max_buffer_size=None): raw._checkSeekable() BufferedReader.__init__(self, raw, buffer_size) BufferedWriter.__init__(self, raw, buffer_size, max_buffer_size)
def test_base64io_decode_readline(bytes_to_read, expected_bytes_read): source_plaintext = os.urandom(io.DEFAULT_BUFFER_SIZE * 2) source_stream = io.BytesIO(base64.b64encode(source_plaintext)) with Base64IO(source_stream) as decoder: test = decoder.readline(bytes_to_read) assert test == source_plaintext[:expected_bytes_read]
def readline(self, limit=-1): # type: (int) -> bytes """Read and return one line from the stream. If limit is specified, at most limit bytes will be read. .. note:: Because the source that this reads from may not contain any OEL characters, we read "lines" in chunks of length ``io.DEFAULT_BUFFER_SIZE``. :type limit: int :rtype: bytes """ return self.read(limit if limit > 0 else io.DEFAULT_BUFFER_SIZE)
def test_file_position_after_tofile(self): # gh-4118 sizes = [io.DEFAULT_BUFFER_SIZE//8, io.DEFAULT_BUFFER_SIZE, io.DEFAULT_BUFFER_SIZE*8] for size in sizes: err_msg = "%d" % (size,) f = open(self.filename, 'wb') f.seek(size-1) f.write(b'\0') f.seek(10) f.write(b'12') np.array([0], dtype=np.float64).tofile(f) pos = f.tell() f.close() assert_equal(pos, 10 + 2 + 8, err_msg=err_msg) f = open(self.filename, 'r+b') f.read(2) f.seek(0, 1) # seek between read&write required by ANSI C np.array([0], dtype=np.float64).tofile(f) pos = f.tell() f.close() assert_equal(pos, 10, err_msg=err_msg)