我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zlib.decompressobj()。
def icompressed2ibytes(source): """ :param source: GENERATOR OF COMPRESSED BYTES :return: GENERATOR OF BYTES """ decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS) last_bytes_count = 0 # Track the last byte count, so we do not show too many debug lines bytes_count = 0 for bytes_ in source: try: data = decompressor.decompress(bytes_) except Exception as e: Log.error("problem", cause=e) bytes_count += len(data) if Math.floor(last_bytes_count, 1000000) != Math.floor(bytes_count, 1000000): last_bytes_count = bytes_count if DEBUG: Log.note("bytes={{bytes}}", bytes=bytes_count) yield data
def decompress(self, data): if not data: return data if not self._first_try: return self._obj.decompress(data) self._data += data try: return self._obj.decompress(data) except zlib.error: self._first_try = False self._obj = zlib.decompressobj(-zlib.MAX_WBITS) try: return self.decompress(self._data) finally: self._data = None
def content(self): """Raw content of response (i.e. bytes). :returns: Body of HTTP response :rtype: :class:`str` """ if not self._content: # Decompress gzipped content if self._gzipped: decoder = zlib.decompressobj(16 + zlib.MAX_WBITS) self._content = decoder.decompress(self.raw.read()) else: self._content = self.raw.read() self._content_loaded = True return self._content
def decompress(self, data): if not data: return data if not self._first_try: return self._obj.decompress(data) self._data += data try: decompressed = self._obj.decompress(data) if decompressed: self._first_try = False self._data = None return decompressed except zlib.error: self._first_try = False self._obj = zlib.decompressobj(-zlib.MAX_WBITS) try: return self.decompress(self._data) finally: self._data = None
def content(self): """Raw content of response (i.e. bytes) :returns: Body of HTTP response :rtype: :class:`str` """ if not self._content: # Decompress gzipped content if self._gzipped: decoder = zlib.decompressobj(16 + zlib.MAX_WBITS) self._content = decoder.decompress(self.raw.read()) else: self._content = self.raw.read() self._content_loaded = True return self._content
def binary_to_term(data: bytes, options: dict = {}): """ Strip 131 header and unpack if the data was compressed. :param data: The incoming encoded data with the 131 byte :param options: See description on top of the module :raises ETFDecodeException: when the tag is not 131, when compressed data is incomplete or corrupted """ if data[0] != ETF_VERSION_TAG: raise ETFDecodeException("Unsupported external term version") if data[1] == TAG_COMPRESSED: do = decompressobj() decomp_size = util.u32(data, 2) decomp = do.decompress(data[6:]) + do.flush() if len(decomp) != decomp_size: # Data corruption? raise ETFDecodeException("Compressed size mismatch with actual") return binary_to_term_2(decomp, options) return binary_to_term_2(data[1:], options)
def __init__(self, stream): """ Encapsulate the given stream in an unzipper. """ # Keep the stream self.stream = stream # Make a decompressor with the accept a gzip header flag. # See <http://stackoverflow.com/a/22311297/402891>. # Will get None'd out after we're done decompressing and it is flushed. self.decompressor = zlib.decompressobj(zlib.MAX_WBITS + 16) # Are we really compressed? This gets set to True if we successfully # read the first block, and False if we failed to read the first block. # If it's False, there's no need to try to decompress any more blocks. self.header_read = None # We need to do lines ourselves, so we need to keep a buffer self.buffer = "" # Track out throughput self.compressed_bytes = 0 self.uncompressed_bytes = 0
def getDanmu(cid): if not cid: return "?????" try: cid_url = "http://comment.bilibili.com/%s.xml" % cid danmu_xml = urllib.request.urlopen(cid_url).read() xml = zlib.decompressobj(-zlib.MAX_WBITS).decompress(danmu_xml).decode() # ???????? return xml # ????? except Exception: pass # ??xml??????????????
def content(self): """Raw content of response (i.e. bytes). :returns: Body of HTTP response :rtype: str """ if not self._content: # Decompress gzipped content if self._gzipped: decoder = zlib.decompressobj(16 + zlib.MAX_WBITS) self._content = decoder.decompress(self.raw.read()) else: self._content = self.raw.read() self._content_loaded = True return self._content