我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用io.BufferedIOBase()。
def readline(self, size=-1): """Read a line of uncompressed bytes from the file. The terminating newline (if present) is retained. If size is non-negative, no more than size bytes will be read (in which case the line may be incomplete). Returns b'' if already at EOF. """ self._check_can_read() # Shortcut for the common case - the whole line is in the buffer. if size < 0: end = self._buffer.find(b"\n", self._buffer_offset) + 1 if end > 0: line = self._buffer[self._buffer_offset : end] self._buffer_offset = end self._pos += len(line) return line return io.BufferedIOBase.readline(self, size)
def __init__(self, lsf_path: str, types: List[Type[pyimc.Message]] = None, make_index=True): """ Reads an LSF file. :param lsf_path: The path to the LSF file. :param types: The message types to return. List of pyimc message classes. :param make_index: If true, an index that speeds up subsequent reads is created. """ self.fpath = lsf_path self.f = None # type: io.BufferedIOBase self.header = IMCHeader() # Preallocate header buffer self.parser = pyimc.Parser() self.idx = {} # type: Dict[Union[int, str], List[int]] self.make_index = make_index if types: self.msg_types = [pyimc.Factory.id_from_abbrev(x.__name__) for x in types] else: self.msg_types = None
def _upload_media_py3(self, media_type, media_file, extension=''): if isinstance(media_file, io.IOBase) and hasattr(media_file, 'name'): extension = media_file.name.split('.')[-1].lower() if not is_allowed_extension(extension): raise ValueError('Invalid file type.') filename = media_file.name elif isinstance(media_file, io.BytesIO): extension = extension.lower() if not is_allowed_extension(extension): raise ValueError('Please provide \'extension\' parameters when the type of \'media_file\' is \'io.BytesIO\'.') filename = 'temp.' + extension else: raise ValueError('Parameter media_file must be io.BufferedIOBase(open a file with \'rb\') or io.BytesIO object.') return self.request.post( url='https://api.weixin.qq.com/cgi-bin/media/upload', params={ 'type': media_type, }, files={ 'media': (filename, media_file, convert_ext_to_mime(extension)) } )
def _get_writable(stream_or_path, mode): """This method returns a tuple containing the stream and a flag to indicate if the stream should be automatically closed. The `stream_or_path` parameter is returned if it is an open writable stream. Otherwise, it treats the `stream_or_path` parameter as a file path and opens it with the given mode. It is used by the svg and png methods to interpret the file parameter. :type stream_or_path: str | io.BufferedIOBase :type mode: str | unicode :rtype: (io.BufferedIOBase, bool) """ is_stream = hasattr(stream_or_path, 'write') if not is_stream: # No stream provided, treat "stream_or_path" as path stream_or_path = open(stream_or_path, mode) return stream_or_path, not is_stream
def readline(self, size=-1): """Read a line of uncompressed bytes from the file. The terminating newline (if present) is retained. If size is non-negative, no more than size bytes will be read (in which case the line may be incomplete). Returns b'' if already at EOF. """ if not isinstance(size, int): if not hasattr(size, "__index__"): raise TypeError("Integer argument expected") size = size.__index__() with self._lock: self._check_can_read() # Shortcut for the common case - the whole line is in the buffer. if size < 0: end = self._buffer.find(b"\n", self._buffer_offset) + 1 if end > 0: line = self._buffer[self._buffer_offset : end] self._buffer_offset = end self._pos += len(line) return line return io.BufferedIOBase.readline(self, size)
def _pipe_stdin(self, stdin): if stdin is None or isinstance(stdin, io.FileIO): return None tsi = self._temp_stdin bufsize = self.bufsize if isinstance(stdin, io.BufferedIOBase): buf = stdin.read(bufsize) while len(buf) != 0: tsi.write(buf) tsi.flush() buf = stdin.read(bufsize) elif isinstance(stdin, (str, bytes)): raw = stdin.encode() if isinstance(stdin, str) else stdin for i in range((len(raw)//bufsize) + 1): tsi.write(raw[i*bufsize:(i + 1)*bufsize]) tsi.flush() else: raise ValueError('stdin not understood {0!r}'.format(stdin))
def write(fobj, content, convert=True): """ Utility function used to write content to a file. Allow compatibility between Python versions. """ # This function automatically converts strings to bytes # if running under Python 3. Otherwise we cannot write # to a file. # First detect whether fobj requires binary stream if hasattr(fobj, 'mode'): # A file-like object binary = 'b' in fobj.mode else: # A subclass of io.BufferedIOBase? binary = isinstance(fobj, io.BufferedIOBase) # If we are running under Python 3 and binary is required if sys.version_info[:2] >= (3, 0) and convert and binary: # pragma: no cover content = bytes(content, 'utf-8') fobj.write(content)
def __init__(self, buffer): """ :param buffer: Buffer :type buffer: io.BufferedIOBase """ self.buffer = buffer
def _gettextwriter(out, encoding): if out is None: import sys out = sys.stdout if isinstance(out, io.RawIOBase): buffer = io.BufferedIOBase(out) # Keep the original file open when the TextIOWrapper is # destroyed buffer.close = lambda: None else: # This is to handle passed objects that aren't in the # IOBase hierarchy, but just have a write method buffer = io.BufferedIOBase() buffer.writable = lambda: True buffer.write = out.write try: # TextIOWrapper uses this methods to determine # if BOM (for UTF-16, etc) should be added buffer.seekable = out.seekable buffer.tell = out.tell except AttributeError: pass # wrap a binary writer with TextIOWrapper return _UnbufferedTextIOWrapper(buffer, encoding=encoding, errors='xmlcharrefreplace', newline='\n')
def output(self, text, highlighter=None): """Send text to the environment's output stream. :param text: the text to output :param highlighter: an optional function to colourize the text """ if not self.has_pipe: # only colourize when the output is not piped highlighter = highlighter or (lambda x: x) text = highlighter(text) output = "{}\n".format(text) if isinstance(self.output_stream, io.BufferedIOBase): output = bytes(output, encoding='utf-8') self.output_stream.write(output) self.output_stream.flush()
def test_io_buffered_by_default(self): p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: self.assertIsInstance(p.stdin, io.BufferedIOBase) self.assertIsInstance(p.stdout, io.BufferedIOBase) self.assertIsInstance(p.stderr, io.BufferedIOBase) finally: p.stdin.close() p.stdout.close() p.stderr.close() p.wait()
def _gettextwriter(out, encoding): if out is None: import sys return sys.stdout if isinstance(out, io.TextIOBase): # use a text writer as is return out # wrap a binary writer with TextIOWrapper if isinstance(out, io.RawIOBase): # Keep the original file open when the TextIOWrapper is # destroyed class _wrapper: __class__ = out.__class__ def __getattr__(self, name): return getattr(out, name) buffer = _wrapper() buffer.close = lambda: None else: # This is to handle passed objects that aren't in the # IOBase hierarchy, but just have a write method buffer = io.BufferedIOBase() buffer.writable = lambda: True buffer.write = out.write try: # TextIOWrapper uses this methods to determine # if BOM (for UTF-16, etc) should be added buffer.seekable = out.seekable buffer.tell = out.tell except AttributeError: pass return io.TextIOWrapper(buffer, encoding=encoding, errors='xmlcharrefreplace', newline='\n', write_through=True)
def __init__(self, filename, mode="rb", compresslevel=9): # This lock must be recursive, so that BufferedIOBase's # readline(), readlines() and writelines() don't deadlock. self._lock = RLock() self._fp = None self._closefp = False self._mode = _MODE_CLOSED self._pos = 0 self._size = -1 if not isinstance(compresslevel, int) or not (1 <= compresslevel <= 9): raise ValueError("'compresslevel' must be an integer " "between 1 and 9. You provided 'compresslevel={}'" .format(compresslevel)) if mode == "rb": mode_code = _MODE_READ self._decompressor = zlib.decompressobj(self.wbits) self._buffer = b"" self._buffer_offset = 0 elif mode == "wb": mode_code = _MODE_WRITE self._compressor = zlib.compressobj(compresslevel, zlib.DEFLATED, self.wbits, zlib.DEF_MEM_LEVEL, 0) else: raise ValueError("Invalid mode: %r" % (mode,)) if isinstance(filename, _basestring): self._fp = io.open(filename, mode) self._closefp = True self._mode = mode_code elif hasattr(filename, "read") or hasattr(filename, "write"): self._fp = filename self._mode = mode_code else: raise TypeError("filename must be a str or bytes object, " "or a file")
def readinto(self, b): """Read up to len(b) bytes into b. Returns the number of bytes read (0 for EOF). """ with self._lock: return io.BufferedIOBase.readinto(self, b)
def is_fileobj(f): """ Check if an object `f` is intance of FileIO object created by `open()`""" return isinstance(f, io.TextIOBase) or \ isinstance(f, io.BufferedIOBase) or \ isinstance(f, io.RawIOBase) or \ isinstance(f, io.IOBase)
def _gettextwriter(out, encoding): if out is None: import sys out = sys.stdout if isinstance(out, io.RawIOBase): buffer = io.BufferedIOBase(out) # Keep the original file open when the TextIOWrapper is # destroyed buffer.close = lambda: None else: # This is to handle passed objects that aren't in the # IOBase hierarchy, but just have a write method buffer = io.BufferedIOBase() buffer.writable = lambda: True buffer.write = out.write try: # TextIOWrapper uses this methods to determine # if BOM (for UTF-16, etc) should be added buffer.seekable = out.seekable buffer.tell = out.tell except AttributeError: pass # wrap a binary writer with TextIOWrapper return _UnbufferedTextIOWrapper(buffer, encoding=encoding, errors='xmlcharrefreplace', newline='\n') # PyPy: moved this class outside the function above
def readlines(self, size=-1): """Read a list of lines of uncompressed bytes from the file. size can be specified to control the number of lines read: no further lines will be read once the total size of the lines read so far equals or exceeds size. """ if not isinstance(size, int): if not hasattr(size, "__index__"): raise TypeError("Integer argument expected") size = size.__index__() with self._lock: return io.BufferedIOBase.readlines(self, size)
def writelines(self, seq): """Write a sequence of byte strings to the file. Returns the number of uncompressed bytes written. seq can be any iterable yielding byte strings. Line separators are not added between the written byte strings. """ with self._lock: return io.BufferedIOBase.writelines(self, seq) # Rewind the file to the beginning of the data stream.
def _gettextwriter(out, encoding): if out is None: import sys return sys.stdout if isinstance(out, io.TextIOBase): # use a text writer as is return out if isinstance(out, (codecs.StreamWriter, codecs.StreamReaderWriter)): # use a codecs stream writer as is return out # wrap a binary writer with TextIOWrapper if isinstance(out, io.RawIOBase): # Keep the original file open when the TextIOWrapper is # destroyed class _wrapper: __class__ = out.__class__ def __getattr__(self, name): return getattr(out, name) buffer = _wrapper() buffer.close = lambda: None else: # This is to handle passed objects that aren't in the # IOBase hierarchy, but just have a write method buffer = io.BufferedIOBase() buffer.writable = lambda: True buffer.write = out.write try: # TextIOWrapper uses this methods to determine # if BOM (for UTF-16, etc) should be added buffer.seekable = out.seekable buffer.tell = out.tell except AttributeError: pass return io.TextIOWrapper(buffer, encoding=encoding, errors='xmlcharrefreplace', newline='\n', write_through=True)
def _gettextwriter(out, encoding): if out is None: import sys out = sys.stdout if isinstance(out, io.RawIOBase): buffer = io.BufferedIOBase(out) # Keep the original file open when the TextIOWrapper is # destroyed buffer.close = lambda: None else: # This is to handle passed objects that aren't in the # IOBase hierarchy, but just have a write method buffer = io.BufferedIOBase() buffer.writable = lambda: True buffer.write = out.write try: # TextIOWrapper uses this methods to determine # if BOM (for UTF-16, etc) should be added buffer.seekable = out.seekable buffer.tell = out.tell except AttributeError: pass # wrap a binary writer with TextIOWrapper class UnbufferedTextIOWrapper(io.TextIOWrapper): def write(self, s): super(UnbufferedTextIOWrapper, self).write(s) self.flush() return UnbufferedTextIOWrapper(buffer, encoding=encoding, errors='xmlcharrefreplace', newline='\n')
def _stdin_filename(self, stdin): if stdin is None: rtn = None elif isinstance(stdin, io.FileIO) and os.path.isfile(stdin.name): rtn = stdin.name elif isinstance(stdin, (io.BufferedIOBase, str, bytes)): self._temp_stdin = tsi = tempfile.NamedTemporaryFile() rtn = tsi.name else: raise ValueError('stdin not understood {0!r}'.format(stdin)) return rtn
def download_into(self, file: str, buffer: Union[BufferedIOBase, BytesIO]=None, remote_path: str=None) -> IOBase: """ Download a file from your STACK account :param file: File name to download :param remote_path: Path to find the file in :param buffer: Buffer to download into (BytesIO, StringIO, file pointer) :return: BytesIO buffer """ if not remote_path: remote_path = self.__cwd file = join(remote_path, file.lstrip("/")) if not buffer: buffer = BytesIO() if not isinstance(buffer, BufferedIOBase): raise StackException("Download buffer must be a binary IO type, please use BytesIO or open your file in 'rb' mode.") try: self.webdav.download_to(buffer, file.lstrip("/")) buffer.seek(0) return buffer except WebDavException as e: raise StackException(e)
def readline(self, limit=-1): """Read and return a line from the stream. If limit is specified, at most limit bytes will be read. """ if not self._universal and limit < 0: # Shortcut common case - newline found in buffer. i = self._readbuffer.find('\n', self._offset) + 1 if i > 0: line = self._readbuffer[self._offset: i] self._offset = i return line if not self._universal: return io.BufferedIOBase.readline(self, limit) line = '' while limit < 0 or len(line) < limit: readahead = self.peek(2) if readahead == '': return line # # Search for universal newlines or line chunks. # # The pattern returns either a line chunk or a newline, but not # both. Combined with peek(2), we are assured that the sequence # '\r\n' is always retrieved completely and never split into # separate newlines - '\r', '\n' due to coincidental readaheads. # match = self.PATTERN.search(readahead) newline = match.group('newline') if newline is not None: if self.newlines is None: self.newlines = [] if newline not in self.newlines: self.newlines.append(newline) self._offset += len(newline) return line + '\n' chunk = match.group('chunk') if limit >= 0: chunk = chunk[: limit - len(line)] self._offset += len(chunk) line += chunk return line
def _prep_binary_content(self): ''' Sets delivery method of either payload or header Favors Content-Location header if set Args: None Returns: None: sets attributes in self.binary and headers ''' # nothing present if not self.data and not self.location and 'Content-Location' not in self.resource.headers.keys(): raise Exception('creating/updating NonRDFSource requires content from self.binary.data, self.binary.location, or the Content-Location header') elif 'Content-Location' in self.resource.headers.keys(): logger.debug('Content-Location header found, using') self.delivery = 'header' # if Content-Location is not set, look for self.data_location then self.data elif 'Content-Location' not in self.resource.headers.keys(): # data_location set, trumps Content self.data if self.location: # set appropriate header self.resource.headers['Content-Location'] = self.location self.delivery = 'header' # data attribute is plain text, binary, or file-like object elif self.data: # if file-like object, set flag for api.http_request if isinstance(self.data, io.BufferedIOBase): logger.debug('detected file-like object') self.delivery = 'payload' # else, just bytes else: logger.debug('detected bytes') self.delivery = 'payload'
def readline(self, limit=-1): """Read and return a line from the stream. If limit is specified, at most limit bytes will be read. """ if not self._universal and limit < 0: # Shortcut common case - newline found in buffer. i = self._readbuffer.find(b'\n', self._offset) + 1 if i > 0: line = self._readbuffer[self._offset: i] self._offset = i return line if not self._universal: return io.BufferedIOBase.readline(self, limit) line = b'' while limit < 0 or len(line) < limit: readahead = self.peek(2) if readahead == b'': return line # # Search for universal newlines or line chunks. # # The pattern returns either a line chunk or a newline, but not # both. Combined with peek(2), we are assured that the sequence # '\r\n' is always retrieved completely and never split into # separate newlines - '\r', '\n' due to coincidental readaheads. # match = self.PATTERN.search(readahead) newline = match.group('newline') if newline is not None: if self.newlines is None: self.newlines = [] if newline not in self.newlines: self.newlines.append(newline) self._offset += len(newline) return line + b'\n' chunk = match.group('chunk') if limit >= 0: chunk = chunk[: limit - len(line)] self._offset += len(chunk) line += chunk return line