我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用zlib.DEF_MEM_LEVEL。
def compress (filename, input, output): output.write('\037\213\010') # Write the header, ... output.write(chr(FNAME)) # ... flag byte ... statval = os.stat(filename) # ... modification time ... mtime = statval[8] write32(output, mtime) output.write('\002') # ... slowest compression alg. ... output.write('\377') # ... OS (=unknown) ... output.write(filename+'\000') # ... original filename ... crcval = zlib.crc32("") compobj = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0) while True: data = input.read(1024) if data == "": break crcval = zlib.crc32(data, crcval) output.write(compobj.compress(data)) output.write(compobj.flush()) write32(output, crcval) # ... the CRC ... write32(output, statval[6]) # and the file size.
def deflate(data, compresslevel=9): # Compress compress = zlib.compressobj(compresslevel, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0) deflated = compress.compress(data) deflated += compress.flush() # Add PDs compression magic and negated file length (8 bytes) length = int(-len(deflated)) magic = bytearray(b'\xc5\xee\xf7\xff\x00\x00\x00\x00') magic[4] = byte(length, 0) magic[5] = byte(length >> 8, 0) magic[6] = byte(length >> 0x10, 0) magic[7] = byte(length >> 0x18, 0) deflatedwithheader = bytes(magic) + deflated finaldata = xor(deflatedwithheader) return finaldata # Decrypts the GT6TED, removes the magic and negated file length and # decompresses the data
def compress_readable_output(src_file, compress_level=6): crc = zlib.crc32(b"") size = 0 zobj = zlib.compressobj(compress_level, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, zlib.Z_DEFAULT_STRATEGY) prefix_written = False while True: data = src_file.read(DEFAULT_BUFFER_SIZE) if not data: break size += len(data) crc = zlib.crc32(data, crc) data = zobj.compress(data) if not prefix_written: prefix_written = True data = gzip_prefix() + data yield data yield zobj.flush() + struct.pack(b"<LL", crc & 0xffffffffL, size)
def __init__ (self, level = 5): self.size = 0 self.crc = zlib.crc32(b"") self.compressor = zlib.compressobj (level, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0) self.first_data = True
def gzip_app_iter(app_iter): size = 0 crc = zlib.crc32(b"") & 0xffffffff compress = zlib.compressobj(9, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0) yield _gzip_header for item in app_iter: size += len(item) crc = zlib.crc32(item, crc) & 0xffffffff yield compress.compress(item) yield compress.flush() yield struct.pack("<2L", crc, size & 0xffffffff)
def __init__(self, filename, mode="rb", compresslevel=9): # This lock must be recursive, so that BufferedIOBase's # readline(), readlines() and writelines() don't deadlock. self._lock = RLock() self._fp = None self._closefp = False self._mode = _MODE_CLOSED self._pos = 0 self._size = -1 if not isinstance(compresslevel, int) or not (1 <= compresslevel <= 9): raise ValueError("'compresslevel' must be an integer " "between 1 and 9. You provided 'compresslevel={}'" .format(compresslevel)) if mode == "rb": mode_code = _MODE_READ self._decompressor = zlib.decompressobj(self.wbits) self._buffer = b"" self._buffer_offset = 0 elif mode == "wb": mode_code = _MODE_WRITE self._compressor = zlib.compressobj(compresslevel, zlib.DEFLATED, self.wbits, zlib.DEF_MEM_LEVEL, 0) else: raise ValueError("Invalid mode: %r" % (mode,)) if isinstance(filename, _basestring): self._fp = io.open(filename, mode) self._closefp = True self._mode = mode_code elif hasattr(filename, "read") or hasattr(filename, "write"): self._fp = filename self._mode = mode_code else: raise TypeError("filename must be a str or bytes object, " "or a file")
def compress(body, compress_level): """Compress 'body' at the given compress_level.""" import zlib # See http://www.gzip.org/zlib/rfc-gzip.html yield ntob('\x1f\x8b') # ID1 and ID2: gzip marker yield ntob('\x08') # CM: compression method yield ntob('\x00') # FLG: none set # MTIME: 4 bytes yield struct.pack('<L', int(time.time()) & int('FFFFFFFF', 16)) yield ntob('\x02') # XFL: max compression, slowest algo yield ntob('\xff') # OS: unknown crc = zlib.crc32(ntob('')) size = 0 zobj = zlib.compressobj(compress_level, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0) for line in body: size += len(line) crc = zlib.crc32(line, crc) yield zobj.compress(line) yield zobj.flush() # CRC32: 4 bytes yield struct.pack('<L', crc & int('FFFFFFFF', 16)) # ISIZE: 4 bytes yield struct.pack('<L', size & int('FFFFFFFF', 16))
def _write_block(self, block): # print("Saving %i bytes" % len(block)) assert len(block) <= 65536 # Giving a negative window bits means no gzip/zlib headers, # -15 used in samtools c = zlib.compressobj(self.compresslevel, zlib.DEFLATED, -15, zlib.DEF_MEM_LEVEL, 0) compressed = c.compress(block) + c.flush() del c assert len(compressed) < 65536, \ "TODO - Didn't compress enough, try less data in this block" crc = zlib.crc32(block) # Should cope with a mix of Python platforms... if crc < 0: crc = struct.pack("<i", crc) else: crc = struct.pack("<I", crc) bsize = struct.pack("<H", len(compressed) + 25) # includes -1 crc = struct.pack("<I", zlib.crc32(block) & 0xffffffff) uncompressed_length = struct.pack("<I", len(block)) # Fixed 16 bytes, # gzip magic bytes (4) mod time (4), # gzip flag (1), os (1), extra length which is six (2), # sub field which is BC (2), sub field length of two (2), # Variable data, # 2 bytes: block length as BC sub field (2) # X bytes: the data # 8 bytes: crc (4), uncompressed data length (4) data = _bgzf_header + bsize + compressed + crc + uncompressed_length self._handle.write(data)
def compress(body, compress_level): """Compress 'body' at the given compress_level.""" import zlib # See http://www.gzip.org/zlib/rfc-gzip.html yield ntob('\x1f\x8b') # ID1 and ID2: gzip marker yield ntob('\x08') # CM: compression method yield ntob('\x00') # FLG: none set # MTIME: 4 bytes yield struct.pack("<L", int(time.time()) & int('FFFFFFFF', 16)) yield ntob('\x02') # XFL: max compression, slowest algo yield ntob('\xff') # OS: unknown crc = zlib.crc32(ntob("")) size = 0 zobj = zlib.compressobj(compress_level, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0) for line in body: size += len(line) crc = zlib.crc32(line, crc) yield zobj.compress(line) yield zobj.flush() # CRC32: 4 bytes yield struct.pack("<L", crc & int('FFFFFFFF', 16)) # ISIZE: 4 bytes yield struct.pack("<L", size & int('FFFFFFFF', 16))
def make_zerobin_payload(string_content): compress = zlib.compressobj( 0, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 4) compressed_data = compress.compress(bytes(mangle_string(string_content))) compressed_data += compress.flush() encoded_data = base64.b64encode(compressed_data) key = os.urandom(32) encoded_key = base64.urlsafe_b64encode(key) encrypted_data = sjcl.SJCL().encrypt(encoded_data, encoded_key) return encrypted_data, encoded_key
def filter(self, handler): is_local_client = handler.client_address[0] in ('127.0.0.1', '::1') pacfile = os.path.join(os.path.dirname(os.path.abspath(__file__)), common.PAC_FILE) urlparts = urlparse.urlsplit(handler.path) if handler.command == 'GET' and urlparts.path.lstrip('/') == common.PAC_FILE: if urlparts.query == 'flush': if is_local_client: thread.start_new_thread(PacUtil.update_pacfile, (pacfile,)) else: return 'mock', {'status': 403, 'headers': {'Content-Type': 'text/plain'}, 'body': 'client address %r not allowed' % handler.client_address[0]} if time.time() - os.path.getmtime(pacfile) > common.PAC_EXPIRED: # check system uptime > 30 minutes uptime = get_uptime() if uptime and uptime > 1800: thread.start_new_thread(lambda: os.utime(pacfile, (time.time(), time.time())) or PacUtil.update_pacfile(pacfile), tuple()) with open(pacfile, 'rb') as fp: content = fp.read() if not is_local_client: serving_addr = urlparts.hostname or ProxyUtil.get_listen_ip() content = content.replace('127.0.0.1', serving_addr) headers = {'Content-Type': 'text/plain'} if 'gzip' in handler.headers.get('Accept-Encoding', ''): headers['Content-Encoding'] = 'gzip' compressobj = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, -zlib.MAX_WBITS, zlib.DEF_MEM_LEVEL, 0) dataio = io.BytesIO() dataio.write('\x1f\x8b\x08\x00\x00\x00\x00\x00\x02\xff') dataio.write(compressobj.compress(content)) dataio.write(compressobj.flush()) dataio.write(struct.pack('<LL', zlib.crc32(content) & 0xFFFFFFFFL, len(content) & 0xFFFFFFFFL)) content = dataio.getvalue() return 'mock', {'status': 200, 'headers': headers, 'body': content}