我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zlib.MAX_WBITS。
def icompressed2ibytes(source): """ :param source: GENERATOR OF COMPRESSED BYTES :return: GENERATOR OF BYTES """ decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS) last_bytes_count = 0 # Track the last byte count, so we do not show too many debug lines bytes_count = 0 for bytes_ in source: try: data = decompressor.decompress(bytes_) except Exception as e: Log.error("problem", cause=e) bytes_count += len(data) if Math.floor(last_bytes_count, 1000000) != Math.floor(bytes_count, 1000000): last_bytes_count = bytes_count if DEBUG: Log.note("bytes={{bytes}}", bytes=bytes_count) yield data
def decompress(self, data): if not data: return data if not self._first_try: return self._obj.decompress(data) self._data += data try: return self._obj.decompress(data) except zlib.error: self._first_try = False self._obj = zlib.decompressobj(-zlib.MAX_WBITS) try: return self.decompress(self._data) finally: self._data = None
def content(self): """Raw content of response (i.e. bytes). :returns: Body of HTTP response :rtype: :class:`str` """ if not self._content: # Decompress gzipped content if self._gzipped: decoder = zlib.decompressobj(16 + zlib.MAX_WBITS) self._content = decoder.decompress(self.raw.read()) else: self._content = self.raw.read() self._content_loaded = True return self._content
def decompress(self, data): if not data: return data if not self._first_try: return self._obj.decompress(data) self._data += data try: decompressed = self._obj.decompress(data) if decompressed: self._first_try = False self._data = None return decompressed except zlib.error: self._first_try = False self._obj = zlib.decompressobj(-zlib.MAX_WBITS) try: return self.decompress(self._data) finally: self._data = None
def content(self): """Raw content of response (i.e. bytes) :returns: Body of HTTP response :rtype: :class:`str` """ if not self._content: # Decompress gzipped content if self._gzipped: decoder = zlib.decompressobj(16 + zlib.MAX_WBITS) self._content = decoder.decompress(self.raw.read()) else: self._content = self.raw.read() self._content_loaded = True return self._content
def _decompressContent(response, new_content): content = new_content try: encoding = response.get('content-encoding', None) if encoding in ['gzip', 'deflate']: if encoding == 'gzip': content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read() if encoding == 'deflate': content = zlib.decompress(content, -zlib.MAX_WBITS) response['content-length'] = str(len(content)) # Record the historical presence of the encoding in a way the won't interfere. response['-content-encoding'] = response['content-encoding'] del response['content-encoding'] except IOError: content = "" raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content) return content
def _decompressContent(response, new_content): content = new_content try: encoding = response.get('content-encoding', None) if encoding in ['gzip', 'deflate']: if encoding == 'gzip': content = gzip.GzipFile(fileobj=io.BytesIO(new_content)).read() if encoding == 'deflate': content = zlib.decompress(content, -zlib.MAX_WBITS) response['content-length'] = str(len(content)) # Record the historical presence of the encoding in a way the won't interfere. response['-content-encoding'] = response['content-encoding'] del response['content-encoding'] except IOError: content = "" raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content) return content
def getDanmu(cid): if not cid: return "?????" try: cid_url = "http://comment.bilibili.com/%s.xml" % cid danmu_xml = urllib.request.urlopen(cid_url).read() xml = zlib.decompressobj(-zlib.MAX_WBITS).decompress(danmu_xml).decode() # ???????? return xml # ????? except Exception: pass # ??xml??????????????
def content(self): """Raw content of response (i.e. bytes). :returns: Body of HTTP response :rtype: str """ if not self._content: # Decompress gzipped content if self._gzipped: decoder = zlib.decompressobj(16 + zlib.MAX_WBITS) self._content = decoder.decompress(self.raw.read()) else: self._content = self.raw.read() self._content_loaded = True return self._content