我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zlib.compressobj()。
def compress_zip(fd, # type: BinaryIO image, # type: np.ndarray depth, # type: int version # type: int ): # type: (...) -> None """ Write a Numpy array to a zip (zlib) compressed stream. {} """ image = normalize_image(image, depth) if util.needs_byteswap(image): compressor = zlib.compressobj() for row in image: row = util.do_byteswap(row) fd.write(compressor.compress(row)) fd.write(compressor.flush()) else: fd.write(zlib.compress(image))
def gzip_encode_add_padding(content): "* Compressing content..." num_chunks = len(content) / CHUNK_SIZE # let's not care about remainders gzip_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) data = gzip_compress.compress(content) comp_cnt = 0 replay = reorder(content[0:num_chunks*CHUNK_SIZE], arg_blocks) assert(len(replay) % CHUNK_SIZE == 0) num_chunks = len(replay) / CHUNK_SIZE # update the blocks print "** Duplicating content (CBC attack)..." data += gzip_compress.compress(replay) # duplicate cipher, should result in duplicate plaintext (prefixed by some garbage) while comp_cnt < WRAP_SIZE-(num_chunks*CHUNK_SIZE+10*CHUNK_SIZE): data += gzip_compress.compress("A"*CHUNK_SIZE) comp_cnt += CHUNK_SIZE print "** Copy original padding..." data += gzip_compress.compress(content[len(content) - 10*CHUNK_SIZE:len(content)]) # copy valid PKCS7 padding data = data + gzip_compress.flush() print "*** Finished" return data
def compress (filename, input, output): output.write('\037\213\010') # Write the header, ... output.write(chr(FNAME)) # ... flag byte ... statval = os.stat(filename) # ... modification time ... mtime = statval[8] write32(output, mtime) output.write('\002') # ... slowest compression alg. ... output.write('\377') # ... OS (=unknown) ... output.write(filename+'\000') # ... original filename ... crcval = zlib.crc32("") compobj = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0) while True: data = input.read(1024) if data == "": break crcval = zlib.crc32(data, crcval) output.write(compobj.compress(data)) output.write(compobj.flush()) write32(output, crcval) # ... the CRC ... write32(output, statval[6]) # and the file size.
def deflate(data, compresslevel=9): # Compress compress = zlib.compressobj(compresslevel, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0) deflated = compress.compress(data) deflated += compress.flush() # Add PDs compression magic and negated file length (8 bytes) length = int(-len(deflated)) magic = bytearray(b'\xc5\xee\xf7\xff\x00\x00\x00\x00') magic[4] = byte(length, 0) magic[5] = byte(length >> 8, 0) magic[6] = byte(length >> 0x10, 0) magic[7] = byte(length >> 0x18, 0) deflatedwithheader = bytes(magic) + deflated finaldata = xor(deflatedwithheader) return finaldata # Decrypts the GT6TED, removes the magic and negated file length and # decompresses the data
def compress_gzip(data): compressed = None gzip_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) try: compressed = gzip_compress.compress(data) + gzip_compress.flush() except Exception as e: print_error('Error when compressing with Gzip: {0}'.format(e)) return compressed # def decompress_gzip(data): # decompressed=None # try: # decompressed = zlib.decompress(gzip_data, zlib.MAX_WBITS|16) # except: # pass # return decompressed
def print_inventory(file_, package, version, objects): file_.write(b'# Sphinx inventory version 2\n') file_.write(b'# Project: ') file_.write(package.encode('utf-8')) file_.write(b'\n') file_.write(b'# Version: ') file_.write(version.encode('utf-8')) file_.write(b'\n') file_.write(b'# The remainder of this file is compressed using zlib.\n') codec = zlib.compressobj() fmt = '{0} {1} {2} . -\n'.format for name, kind, mysterious_number, _ in objects: line = fmt(name, kind, mysterious_number) code = codec.compress(line.encode('utf-8')) file_.write(code) file_.write(codec.flush()) file_.flush()
def compress_file(self): # stream-compress to another file then overwrite original self.file = open(self.filename, 'rb') compressed_filename = '%s.zlib' % self.filename compressed_file = open(compressed_filename, 'wb') compressor = zlib.compressobj(self.compression_level) compressed_file.write(struct.pack('>I', os.stat(self.filename).st_size)) data = self.file.read(READ_AMOUNT) while len(data) > 0: compressed_file.write(compressor.compress(data)) data = self.file.read(READ_AMOUNT) compressed_file.write(compressor.flush(zlib.Z_FINISH)) self.file.close() compressed_file.close() os.rename(compressed_filename, self.filename)
def compress_readable_output(src_file, compress_level=6): crc = zlib.crc32(b"") size = 0 zobj = zlib.compressobj(compress_level, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, zlib.Z_DEFAULT_STRATEGY) prefix_written = False while True: data = src_file.read(DEFAULT_BUFFER_SIZE) if not data: break size += len(data) crc = zlib.crc32(data, crc) data = zobj.compress(data) if not prefix_written: prefix_written = True data = gzip_prefix() + data yield data yield zobj.flush() + struct.pack(b"<LL", crc & 0xffffffffL, size)
def _newKeys(self): """ Called back by a subclass once a I{MSG_NEWKEYS} message has been received. This indicates key exchange has completed and new encryption and compression parameters should be adopted. Any messages which were queued during key exchange will also be flushed. """ log.msg('NEW KEYS') self.currentEncryptions = self.nextEncryptions if self.outgoingCompressionType == b'zlib': self.outgoingCompression = zlib.compressobj(6) if self.incomingCompressionType == b'zlib': self.incomingCompression = zlib.decompressobj() self._keyExchangeState = self._KEY_EXCHANGE_NONE messages = self._blockedByKeyExchange self._blockedByKeyExchange = None for (messageType, payload) in messages: self.sendPacket(messageType, payload)
def add_compression_filter(self, encoding='deflate', *, EOF_MARKER=EOF_MARKER, EOL_MARKER=EOL_MARKER): """Compress incoming stream with deflate or gzip encoding.""" zlib_mode = (16 + zlib.MAX_WBITS if encoding == 'gzip' else -zlib.MAX_WBITS) zcomp = zlib.compressobj(wbits=zlib_mode) chunk = yield while True: if chunk is EOF_MARKER: yield zcomp.flush() chunk = yield EOF_MARKER else: yield zcomp.compress(chunk) chunk = yield EOL_MARKER
def test_recording_gzipped_responses_as_text(vts_rec_on, httpserver): data = "Hello!" # http://stackoverflow.com/a/22310760 gzip_compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) gzipped = gzip_compressor.compress(data.encode()) + gzip_compressor.flush() httpserver.serve_content( gzipped, 200, headers={"Content-Encoding": "gzip"}) url = "{}/".format(httpserver.url) resp = requests.get(url) assert resp.status_code == 200 assert resp.text == data assert len(vts_rec_on.cassette) == 1 track = vts_rec_on.cassette[0] assert track['request']['url'] == url assert "Content-Encoding" in track['response']['headers'] assert track['response']['body'] == data # enable pytester fixture which allows running pytests within tests
def test_response_with_precompressed_body_gzip(loop, test_client): @asyncio.coroutine def handler(request): headers = {'Content-Encoding': 'gzip'} zcomp = zlib.compressobj(wbits=16 + zlib.MAX_WBITS) data = zcomp.compress(b'mydata') + zcomp.flush() return web.Response(body=data, headers=headers) app = web.Application() app.router.add_get('/', handler) client = yield from test_client(app) resp = yield from client.get('/') assert 200 == resp.status data = yield from resp.read() assert b'mydata' == data assert resp.headers.get('Content-Encoding') == 'gzip'
def test_response_with_precompressed_body_deflate(loop, test_client): @asyncio.coroutine def handler(request): headers = {'Content-Encoding': 'deflate'} zcomp = zlib.compressobj(wbits=-zlib.MAX_WBITS) data = zcomp.compress(b'mydata') + zcomp.flush() return web.Response(body=data, headers=headers) app = web.Application() app.router.add_get('/', handler) client = yield from test_client(app) resp = yield from client.get('/') assert 200 == resp.status data = yield from resp.read() assert b'mydata' == data assert resp.headers.get('Content-Encoding') == 'deflate'
def deflate(data): c = zlib.compressobj() out = c.compress(data) out += c.flush(zlib.Z_SYNC_FLUSH) return out
def start_compressing(self): """start_compressing() Enable deflate compression on the socket (RFC 4978).""" # rfc 1951 - pure DEFLATE, so use -15 for both windows self.decompressor = zlib.decompressobj(-15) self.compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -15)
def _create_compressor(self): return zlib.compressobj(tornado.web.GZipContentEncoding.GZIP_LEVEL, zlib.DEFLATED, -self._max_wbits)
def write_response(handler, code, headers, data=""): handler.send_response(200) for header in headers: i = header.index(":") s,e = header[:i], header[i+1:] handler.send_header(s,e) if data: zlib_encode = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) content = zlib_encode.compress(data) + zlib_encode.flush() if len(content) < len(data): handler.send_header('Content-Encoding', 'gzip') handler.send_header('Content-Length', len(content)) else: content = data handler.end_headers() handler.wfile.write(content) else: handler.wfile.write(data)
def ssh_NEWKEYS(self, packet): if packet != '': self.sendDisconnect(DISCONNECT_PROTOCOL_ERROR, "NEWKEYS takes no data") self.currentEncryptions = self.nextEncryptions if self.outgoingCompressionType == 'zlib': self.outgoingCompression = zlib.compressobj(6) #self.outgoingCompression.compress = lambda x: self.outgoingCompression.compress(x) + self.outgoingCompression.flush(zlib.Z_SYNC_FLUSH) if self.incomingCompressionType == 'zlib': self.incomingCompression = zlib.decompressobj()
def ssh_NEWKEYS(self, packet): if packet != '': self.sendDisconnect(DISCONNECT_PROTOCOL_ERROR, "NEWKEYS takes no data") if not self.nextEncryptions.enc_block_size: self._gotNewKeys = 1 return self.currentEncryptions = self.nextEncryptions if self.outgoingCompressionType == 'zlib': self.outgoingCompression = zlib.compressobj(6) #self.outgoingCompression.compress = lambda x: self.outgoingCompression.compress(x) + self.outgoingCompression.flush(zlib.Z_SYNC_FLUSH) if self.incomingCompressionType == 'zlib': self.incomingCompression = zlib.decompressobj() self.connectionSecure()
def handle(self): compressor = zlib.compressobj(1) # Find out what file the client wants filename = self.request.recv(1024).decode('utf-8') self.logger.debug('client asked for: %r', filename) # Send chunks of the file as they are compressed with open(filename, 'rb') as input: while True: block = input.read(BLOCK_SIZE) if not block: break self.logger.debug('RAW %r', block) compressed = compressor.compress(block) if compressed: self.logger.debug( 'SENDING %r', binascii.hexlify(compressed)) self.request.send(compressed) else: self.logger.debug('BUFFERING') # Send any data being buffered by the compressor remaining = compressor.flush() while remaining: to_send = remaining[:BLOCK_SIZE] remaining = remaining[BLOCK_SIZE:] self.logger.debug('FLUSHING %r', binascii.hexlify(to_send)) self.request.send(to_send) return
def __init__(self, map_, parent=False): # parent=True enables saving all data sent instead of just # deleting it afterwards. self.parent = parent self.generator = map_.get_generator() self.compressor = zlib.compressobj(COMPRESSION_LEVEL)
def __init__(self): self.z = zlib.compressobj(9)
def _CalculateCompressedSize(file_path): CHUNK_SIZE = 256 * 1024 compressor = zlib.compressobj() total_size = 0 with open(file_path, 'rb') as f: for chunk in iter(lambda: f.read(CHUNK_SIZE), ''): total_size += len(compressor.compress(chunk)) total_size += len(compressor.flush()) return total_size
def __init__ (self, level = 5): self.compressor = zlib.compressobj (5, zlib.DEFLATED)
def __init__ (self, level = 5): self.size = 0 self.crc = zlib.crc32(b"") self.compressor = zlib.compressobj (level, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0) self.first_data = True
def __init__ (self, producer, level=6): self.producer = producer self.compressor = zlib.compressobj (level, zlib.DEFLATED) self.override ()
def __init__(self, out): self.compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS + 16) self.out = out
def compress(buff): buff = buff.encode('utf-8') compressobj = zlib.compressobj(6, zlib.DEFLATED, zlib.MAX_WBITS + 16) compressed = compressobj.compress(buff) compressed += compressobj.flush() return compressed # plain "inflate"
def compress_alt(buff): buff = buff.encode('utf-8') compressobj = zlib.compressobj(6, zlib.DEFLATED) compressed = compressobj.compress(buff) compressed += compressobj.flush() # drop gzip headers/tail compressed = compressed[2:-4] return compressed # Brotli
def frame_outbound(self, proto, opcode, rsv, data, fin): if not self._compressible_opcode(opcode): return (rsv, data) if opcode is not Opcode.CONTINUATION: rsv = RsvBits(True, *rsv[1:]) if self._compressor is None: assert opcode is not Opcode.CONTINUATION if proto.client: bits = self.client_max_window_bits else: bits = self.server_max_window_bits self._compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -int(bits)) data = self._compressor.compress(bytes(data)) if fin: data += self._compressor.flush(zlib.Z_SYNC_FLUSH) data = data[:-4] if proto.client: no_context_takeover = self.client_no_context_takeover else: no_context_takeover = self.server_no_context_takeover if no_context_takeover: self._compressor = None return (rsv, data)
def write_to_stdout(data): sys.stdout.write(data + '\n') sys.stdout.flush() # The standard zlib.compressobj() accepts only positional arguments.
def zlib_compressobj(level=6, method=zlib.DEFLATED, wbits=15, memlevel=8, strategy=zlib.Z_DEFAULT_STRATEGY): return zlib.compressobj(level, method, wbits, memlevel, strategy)
def gzip_app_iter(app_iter): size = 0 crc = zlib.crc32(b"") & 0xffffffff compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0) yield _gzip_header for item in app_iter: size += len(item) crc = zlib.crc32(item, crc) & 0xffffffff yield compress.compress(item) yield compress.flush() yield struct.pack("<2L", crc, size & 0xffffffff)
def compress_zip_prediction(fd, # type: BinaryIO image, # type: np.ndarray depth, # type: int version # type: int ): # type: (...) -> None """ Write a Numpy array to a zip (zlib) with prediction compressed stream. Not supported for 1- or 32-bit images. {} """ if depth == 1: # pragma: no cover raise ValueError( "zip with prediction is not supported for 1-bit images") elif depth == 32: # pragma: no cover raise ValueError( "zip with prediction is not implemented for 32-bit images") elif depth == 8: encoder = packbits.encode_prediction_8bit elif depth == 16: encoder = packbits.encode_prediction_16bit compressor = zlib.compressobj() for row in image: encoder(row.flatten()) row = util.ensure_bigendian(row) fd.write(compressor.compress(row)) fd.write(compressor.flush())
def gzip_encode(content): gzip_compress = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS | 16) data = gzip_compress.compress(content) + gzip_compress.flush() return data
def main(): if len(sys.argv) > 1: filename = sys.argv[1] else: filename = sys.argv[0] print 'Reading', filename f = open(filename, 'rb') # Get the data to compress s = f.read() f.close() # First, we'll compress the string in one step comptext = zlib.compress(s, 1) decomp = zlib.decompress(comptext) print '1-step compression: (level 1)' print ' Original:', len(s), 'Compressed:', len(comptext), print 'Uncompressed:', len(decomp) # Now, let's compress the string in stages; set chunk to work in smaller steps chunk = 256 compressor = zlib.compressobj(9) decompressor = zlib.decompressobj() comptext = decomp = '' for i in range(0, len(s), chunk): comptext = comptext+compressor.compress(s[i:i+chunk]) # Don't forget to call flush()!! comptext = comptext + compressor.flush() for i in range(0, len(comptext), chunk): decomp = decomp + decompressor.decompress(comptext[i:i+chunk]) decomp=decomp+decompressor.flush() print 'Progressive compression (level 9):' print ' Original:', len(s), 'Compressed:', len(comptext), print 'Uncompressed:', len(decomp)
def _init_zlib(self): ''' Internal method for setting up the zlib compression and decompression objects. ''' self._zcomp_read = zlib.decompressobj() self._zcomp_write = zlib.compressobj(self.compresslevel)
def dump_inventory(self): self.info(bold('dumping object inventory... '), nonl=True) f = open(path.join(self.outdir, INVENTORY_FILENAME), 'wb') try: f.write((u'# Sphinx inventory version 2\n' u'# Project: %s\n' u'# Version: %s\n' u'# The remainder of this file is compressed using zlib.\n' % (self.config.project, self.config.version)).encode('utf-8')) compressor = zlib.compressobj(9) for domainname, domain in iteritems(self.env.domains): for name, dispname, type, docname, anchor, prio in \ sorted(domain.get_objects()): if anchor.endswith(name): # this can shorten the inventory by as much as 25% anchor = anchor[:-len(name)] + '$' uri = self.get_target_uri(docname) + '#' + anchor if dispname == name: dispname = u'-' f.write(compressor.compress( (u'%s %s:%s %s %s %s\n' % (name, domainname, type, prio, uri, dispname)).encode('utf-8'))) f.write(compressor.flush()) finally: f.close() self.info('done')
def __init__(self): self.compressor = zlib.compressobj(6, zlib.DEFLATED, -zlib.MAX_WBITS, 8)