我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zlib.DEFLATED。
def gzip_encode_add_padding(content): "* Compressing content..." num_chunks = len(content) / CHUNK_SIZE # let's not care about remainders gzip_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) data = gzip_compress.compress(content) comp_cnt = 0 replay = reorder(content[0:num_chunks*CHUNK_SIZE], arg_blocks) assert(len(replay) % CHUNK_SIZE == 0) num_chunks = len(replay) / CHUNK_SIZE # update the blocks print "** Duplicating content (CBC attack)..." data += gzip_compress.compress(replay) # duplicate cipher, should result in duplicate plaintext (prefixed by some garbage) while comp_cnt < WRAP_SIZE-(num_chunks*CHUNK_SIZE+10*CHUNK_SIZE): data += gzip_compress.compress("A"*CHUNK_SIZE) comp_cnt += CHUNK_SIZE print "** Copy original padding..." data += gzip_compress.compress(content[len(content) - 10*CHUNK_SIZE:len(content)]) # copy valid PKCS7 padding data = data + gzip_compress.flush() print "*** Finished" return data
def compress (filename, input, output): output.write('\037\213\010') # Write the header, ... output.write(chr(FNAME)) # ... flag byte ... statval = os.stat(filename) # ... modification time ... mtime = statval[8] write32(output, mtime) output.write('\002') # ... slowest compression alg. ... output.write('\377') # ... OS (=unknown) ... output.write(filename+'\000') # ... original filename ... crcval = zlib.crc32("") compobj = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0) while True: data = input.read(1024) if data == "": break crcval = zlib.crc32(data, crcval) output.write(compobj.compress(data)) output.write(compobj.flush()) write32(output, crcval) # ... the CRC ... write32(output, statval[6]) # and the file size.
def deflate(data, compresslevel=9): # Compress compress = zlib.compressobj(compresslevel, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0) deflated = compress.compress(data) deflated += compress.flush() # Add PDs compression magic and negated file length (8 bytes) length = int(-len(deflated)) magic = bytearray(b'\xc5\xee\xf7\xff\x00\x00\x00\x00') magic[4] = byte(length, 0) magic[5] = byte(length >> 8, 0) magic[6] = byte(length >> 0x10, 0) magic[7] = byte(length >> 0x18, 0) deflatedwithheader = bytes(magic) + deflated finaldata = xor(deflatedwithheader) return finaldata # Decrypts the GT6TED, removes the magic and negated file length and # decompresses the data
def compress_gzip(data): compressed = None gzip_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) try: compressed = gzip_compress.compress(data) + gzip_compress.flush() except Exception as e: print_error('Error when compressing with Gzip: {0}'.format(e)) return compressed # def decompress_gzip(data): # decompressed=None # try: # decompressed = zlib.decompress(gzip_data, zlib.MAX_WBITS|16) # except: # pass # return decompressed
def compress_readable_output(src_file, compress_level=6): crc = zlib.crc32(b"") size = 0 zobj = zlib.compressobj(compress_level, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, zlib.Z_DEFAULT_STRATEGY) prefix_written = False while True: data = src_file.read(DEFAULT_BUFFER_SIZE) if not data: break size += len(data) crc = zlib.crc32(data, crc) data = zobj.compress(data) if not prefix_written: prefix_written = True data = gzip_prefix() + data yield data yield zobj.flush() + struct.pack(b"<LL", crc & 0xffffffffL, size)
def test_recording_gzipped_responses_as_text(vts_rec_on, httpserver): data = "Hello!" # http://stackoverflow.com/a/22310760 gzip_compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) gzipped = gzip_compressor.compress(data.encode()) + gzip_compressor.flush() httpserver.serve_content( gzipped, 200, headers={"Content-Encoding": "gzip"}) url = "{}/".format(httpserver.url) resp = requests.get(url) assert resp.status_code == 200 assert resp.text == data assert len(vts_rec_on.cassette) == 1 track = vts_rec_on.cassette[0] assert track['request']['url'] == url assert "Content-Encoding" in track['response']['headers'] assert track['response']['body'] == data # enable pytester fixture which allows running pytests within tests
def start_compressing(self): """start_compressing() Enable deflate compression on the socket (RFC 4978).""" # rfc 1951 - pure DEFLATE, so use -15 for both windows self.decompressor = zlib.decompressobj(-15) self.compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
def _create_compressor(self): return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL, zlib.DEFLATED, -self._max_wbits)
def write_response(handler, code, headers, data=""): handler.send_response(200) for header in headers: i = header.index(":") s,e = header[:i], header[i+1:] handler.send_header(s,e) if data: zlib_encode = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) content = zlib_encode.compress(data) + zlib_encode.flush() if len(content) < len(data): handler.send_header('Content-Encoding', 'gzip') handler.send_header('Content-Length', len(content)) else: content = data handler.end_headers() handler.wfile.write(content) else: handler.wfile.write(data)
def __init__(self, im): DefinitionTag.__init__(self) self.tagtype = 36 # DefineBitsLossless2 # convert image (note that format is ARGB) # even a grayscale image is stored in ARGB, nevertheless, # the fabilous deflate compression will make it that not much # more data is required for storing (25% or so, and less than 10% # when storing RGB as ARGB). if len(im.shape) == 3: if im.shape[2] in [3, 4]: tmp = np.ones((im.shape[0], im.shape[1], 4), dtype=np.uint8) * 255 for i in range(3): tmp[:, :, i + 1] = im[:, :, i] if im.shape[2] == 4: tmp[:, :, 0] = im[:, :, 3] # swap channel where alpha is else: # pragma: no cover raise ValueError("Invalid shape to be an image.") elif len(im.shape) == 2: tmp = np.ones((im.shape[0], im.shape[1], 4), dtype=np.uint8)*255 for i in range(3): tmp[:, :, i + 1] = im[:, :] else: # pragma: no cover raise ValueError("Invalid shape to be an image.") # we changed the image to uint8 4 channels. # now compress! self._data = zlib.compress(tmp.tostring(), zlib.DEFLATED) self.imshape = im.shape
def __init__ (self, level = 5): self.compressor = zlib.compressobj (5, zlib.DEFLATED)
def __init__ (self, level = 5): self.size = 0 self.crc = zlib.crc32(b"") self.compressor = zlib.compressobj (level, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0) self.first_data = True
def __init__ (self, producer, level=6): self.producer = producer self.compressor = zlib.compressobj (level, zlib.DEFLATED) self.override ()
def __init__(self, out): self.compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS + 16) self.out = out
def compress(buff): buff = buff.encode('utf-8') compressobj = zlib.compressobj(6, zlib.DEFLATED, zlib.MAX_WBITS + 16) compressed = compressobj.compress(buff) compressed += compressobj.flush() return compressed # plain "inflate"
def compress_alt(buff): buff = buff.encode('utf-8') compressobj = zlib.compressobj(6, zlib.DEFLATED) compressed = compressobj.compress(buff) compressed += compressobj.flush() # drop gzip headers/tail compressed = compressed[2:-4] return compressed # Brotli
def frame_outbound(self, proto, opcode, rsv, data, fin): if not self._compressible_opcode(opcode): return (rsv, data) if opcode is not Opcode.CONTINUATION: rsv = RsvBits(True, *rsv[1:]) if self._compressor is None: assert opcode is not Opcode.CONTINUATION if proto.client: bits = self.client_max_window_bits else: bits = self.server_max_window_bits self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -int(bits)) data = self._compressor.compress(bytes(data)) if fin: data += self._compressor.flush(zlib.Z_SYNC_FLUSH) data = data[:-4] if proto.client: no_context_takeover = self.client_no_context_takeover else: no_context_takeover = self.server_no_context_takeover if no_context_takeover: self._compressor = None return (rsv, data)
def zlib_compressobj(level=6, method=zlib.DEFLATED, wbits=15, memlevel=8, strategy=zlib.Z_DEFAULT_STRATEGY): return zlib.compressobj(level, method, wbits, memlevel, strategy)
def gzip_app_iter(app_iter): size = 0 crc = zlib.crc32(b"") & 0xffffffff compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0) yield _gzip_header for item in app_iter: size += len(item) crc = zlib.crc32(item, crc) & 0xffffffff yield compress.compress(item) yield compress.flush() yield struct.pack("<2L", crc, size & 0xffffffff)
def gzip_encode(content): gzip_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) data = gzip_compress.compress(content) + gzip_compress.flush() return data
def __init__(self): self.compressor = zlib.compressobj(6, zlib.DEFLATED, -zlib.MAX_WBITS, 8)
def gzipstream(data): import zlib gzip_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS + 16) # compatible to Windows GZipStream gzip_data = gzip_compress.compress(data) + gzip_compress.flush() return gzip_data
def __init__(self, filename, mode="rb", compresslevel=9): # This lock must be recursive, so that BufferedIOBase's # readline(), readlines() and writelines() don't deadlock. self._lock = RLock() self._fp = None self._closefp = False self._mode = _MODE_CLOSED self._pos = 0 self._size = -1 if not isinstance(compresslevel, int) or not (1 <= compresslevel <= 9): raise ValueError("'compresslevel' must be an integer " "between 1 and 9. You provided 'compresslevel={}'" .format(compresslevel)) if mode == "rb": mode_code = _MODE_READ self._decompressor = zlib.decompressobj(self.wbits) self._buffer = b"" self._buffer_offset = 0 elif mode == "wb": mode_code = _MODE_WRITE self._compressor = zlib.compressobj(compresslevel, zlib.DEFLATED, self.wbits, zlib.DEF_MEM_LEVEL, 0) else: raise ValueError("Invalid mode: %r" % (mode,)) if isinstance(filename, _basestring): self._fp = io.open(filename, mode) self._closefp = True self._mode = mode_code elif hasattr(filename, "read") or hasattr(filename, "write"): self._fp = filename self._mode = mode_code else: raise TypeError("filename must be a str or bytes object, " "or a file")
def _create_compressor(self): return zlib.compressobj(-1, zlib.DEFLATED, -self._max_wbits)
def compress_zlib(data): compressed = None zlib_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS) try: compressed = zlib_compress.compress(data) + zlib_compress.flush() except Exception as e: print_error('Error when compressing with Zlib: {0}'.format(e)) return compressed # def decompress_zlib(data): # decompressed = None # i = -15 # # The absolute value of wbits is the base two logarithm of the size of the history buffer (the "window size") # # used when compressing data # valid_wbits = [] # truncated_wbits = [] # decompressed = None # for i in range(-15,16): # try: # decompressed = zlib.decompress(data, i) # valid_wbits.append(i) # except zlib.error as e: # # Error -5 : incomplete or truncated data input implies that it's zlib compressed but the stream is not complete # if str(e).startswith('Error -5'): # tuncated_wbits.append(i) # return (valid_wbits, truncated_wbits, decompressed)
def compress_deflate(data): compressed = None deflate_compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS) try: compressed = deflate_compress.compress(data) + deflate_compress.flush() except Exception as e: print_error('Error when compressing with Deflate: {0}'.format(e)) return compressed
def __init__(self, window_bits): self._logger = get_class_logger(self) self._compress = zlib.compressobj( zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -window_bits)
def compress(body, compress_level): """Compress 'body' at the given compress_level.""" import zlib # See http://www.gzip.org/zlib/rfc-gzip.html yield ntob('\x1f\x8b') # ID1 and ID2: gzip marker yield ntob('\x08') # CM: compression method yield ntob('\x00') # FLG: none set # MTIME: 4 bytes yield struct.pack('<L', int(time.time()) & int('FFFFFFFF', 16)) yield ntob('\x02') # XFL: max compression, slowest algo yield ntob('\xff') # OS: unknown crc = zlib.crc32(ntob('')) size = 0 zobj = zlib.compressobj(compress_level, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0) for line in body: size += len(line) crc = zlib.crc32(line, crc) yield zobj.compress(line) yield zobj.flush() # CRC32: 4 bytes yield struct.pack('<L', crc & int('FFFFFFFF', 16)) # ISIZE: 4 bytes yield struct.pack('<L', size & int('FFFFFFFF', 16))
def _write_block(self, block): # print("Saving %i bytes" % len(block)) assert len(block) <= 65536 # Giving a negative window bits means no gzip/zlib headers, # -15 used in samtools c = zlib.compressobj(self.compresslevel, zlib.DEFLATED, -15, zlib.DEF_MEM_LEVEL, 0) compressed = c.compress(block) + c.flush() del c assert len(compressed) < 65536, \ "TODO - Didn't compress enough, try less data in this block" crc = zlib.crc32(block) # Should cope with a mix of Python platforms... if crc < 0: crc = struct.pack("<i", crc) else: crc = struct.pack("<I", crc) bsize = struct.pack("<H", len(compressed) + 25) # includes -1 crc = struct.pack("<I", zlib.crc32(block) & 0xffffffff) uncompressed_length = struct.pack("<I", len(block)) # Fixed 16 bytes, # gzip magic bytes (4) mod time (4), # gzip flag (1), os (1), extra length which is six (2), # sub field which is BC (2), sub field length of two (2), # Variable data, # 2 bytes: block length as BC sub field (2) # X bytes: the data # 8 bytes: crc (4), uncompressed data length (4) data = _bgzf_header + bsize + compressed + crc + uncompressed_length self._handle.write(data)
def start_compress_message(self): # compressobj([level[, method[, wbits[, mem_level[, strategy]]]]]) # http://bugs.python.org/issue19278 # http://hg.python.org/cpython/rev/c54c8e71b79a if self._is_server: if self._compressor is None or self.server_no_context_takeover: self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -self.server_max_window_bits, self.mem_level) else: if self._compressor is None or self.client_no_context_takeover: self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -self.client_max_window_bits, self.mem_level)
def __init__(self, compressLevel, request): self._zlibCompressor = zlib.compressobj( compressLevel, zlib.DEFLATED, 16 + zlib.MAX_WBITS) self._request = request