我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zlib.compress()。
def copy_from_host(module): compress = module.params.get('compress') src = module.params.get('src') if not os.path.exists(src): module.fail_json(msg="file not found: {}".format(src)) if not os.access(src, os.R_OK): module.fail_json(msg="file is not readable: {}".format(src)) mode = oct(os.stat(src).st_mode & 0o777) with open(src, 'rb') as f: raw_data = f.read() sha1 = hashlib.sha1(raw_data).hexdigest() data = zlib.compress(raw_data) if compress else raw_data module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode, source=src)
def main(): argument_spec = dict( compress=dict(default=True, type='bool'), dest=dict(type='str'), mode=dict(default='0644', type='str'), sha1=dict(default=None, type='str'), src=dict(required=True, type='str') ) module = AnsibleModule(argument_spec) dest = module.params.get('dest') try: if dest: copy_to_host(module) else: copy_from_host(module) except Exception: module.exit_json(failed=True, changed=True, msg=repr(traceback.format_exc())) # import module snippets
def get_console_log(session, arg_dict): try: raw_dom_id = arg_dict['dom_id'] except KeyError: raise dom0_pluginlib.PluginError("Missing dom_id") try: dom_id = int(raw_dom_id) except ValueError: raise dom0_pluginlib.PluginError("Invalid dom_id") logfile = open(CONSOLE_LOG_FILE_PATTERN % dom_id, 'rb') try: try: log_content = _last_bytes(logfile) except IOError, e: # noqa msg = "Error reading console: %s" % e logging.debug(msg) raise dom0_pluginlib.PluginError(msg) finally: logfile.close() return base64.b64encode(zlib.compress(log_content))
def test_compressed(self): """ ByteArrays can be compressed. Test the C{compressed} attribute for validity. """ try: import zlib except ImportError: self.skipTest('zlib is missing') ba = amf3.ByteArray() self.assertFalse(ba.compressed) z = zlib.compress('b' * 100) ba = amf3.ByteArray(z) self.assertTrue(ba.compressed) z = zlib.compress('\x00' * 100) ba = amf3.ByteArray(z) self.assertTrue(ba.compressed)
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None): self._createdir() # Cache dir can be deleted at any time. fname = self._key_to_file(key, version) self._cull() # make some room if necessary fd, tmp_path = tempfile.mkstemp(dir=self._dir) renamed = False try: with io.open(fd, 'wb') as f: expiry = self.get_backend_timeout(timeout) f.write(pickle.dumps(expiry, -1)) f.write(zlib.compress(pickle.dumps(value), -1)) file_move_safe(tmp_path, fname, allow_overwrite=True) renamed = True finally: if not renamed: os.remove(tmp_path)
def set(self, key, value, timeout=DEFAULT_TIMEOUT, version=None): self._createdir() # Cache dir can be deleted at any time. fname = self._key_to_file(key, version) self._cull() # make some room if necessary fd, tmp_path = tempfile.mkstemp(dir=self._dir) renamed = False try: with io.open(fd, 'wb') as f: expiry = self.get_backend_timeout(timeout) f.write(pickle.dumps(expiry, pickle.HIGHEST_PROTOCOL)) f.write(zlib.compress(pickle.dumps(value, pickle.HIGHEST_PROTOCOL))) file_move_safe(tmp_path, fname, allow_overwrite=True) renamed = True finally: if not renamed: os.remove(tmp_path)
def main(): argument_spec = dict( compress=dict(default=True, type='bool'), dest=dict(type='str'), mode=dict(default='0644', type='str'), sha1=dict(default=None, type='str'), src=dict(required=True, type='str') ) module = AnsibleModule(argument_spec) dest = module.params.get('dest') try: if dest: copy_to_host(module) else: copy_from_host(module) except Exception as e: module.exit_json(failed=True, changed=True, msg=repr(e)) # import module snippets
def test_binary_string(self): # Binary strings should be cacheable cache = self.cache from zlib import compress, decompress value = 'value_to_be_compressed' compressed_value = compress(value.encode()) # Test set cache.set('binary1', compressed_value) compressed_result = cache.get('binary1') self.assertEqual(compressed_value, compressed_result) self.assertEqual(value, decompress(compressed_result).decode()) # Test add cache.add('binary1-add', compressed_value) compressed_result = cache.get('binary1-add') self.assertEqual(compressed_value, compressed_result) self.assertEqual(value, decompress(compressed_result).decode()) # Test set_many cache.set_many({'binary1-set_many': compressed_value}) compressed_result = cache.get('binary1-set_many') self.assertEqual(compressed_value, compressed_result) self.assertEqual(value, decompress(compressed_result).decode())
def compressedField(field): # Decorator for compressed fields: def fget(self): data = getattr(self, field) if data is None: return None return zlib.decompress(data) def fset(self, value): setattr(self, field, zlib.compress(value.encode())) def fdel(self): delattr(self, field) return {'doc': "The compression property for %s." % field, 'fget': fget, 'fset': fset, 'fdel': fdel}
def _open(self, fps=12, loop=True, html=False, compress=False): if not _swf: load_lib() self._arg_fps = int(fps) self._arg_loop = bool(loop) self._arg_html = bool(html) self._arg_compress = bool(compress) self._fp = self.request.get_file() self._framecounter = 0 self._framesize = (100, 100) # For compress, we use an in-memory file object if self._arg_compress: self._fp_real = self._fp self._fp = BytesIO()
def get_encoded_library_string(arch): filepath=None if arch=="x86": filepath=os.path.join("resources","libraryx86.zip") elif arch=="x64": filepath=os.path.join("resources","libraryx64.zip") else: raise Exception("unknown arch %s"%arch) f = StringIO.StringIO() f.write(open(filepath, "rb").read()) zip = zipfile.ZipFile(f) modules = dict([(z.filename, zip.open(z.filename,).read()) for z in zip. infolist() if os.path.splitext(z.filename)[1] in [".py",".pyd",".dll",".pyc",".pyo"]]) return zlib.compress(marshal.dumps(modules),9)
def run(self): print("VEDIO client starts...") while True: try: self.sock.connect(self.ADDR) break except: time.sleep(3) continue print("VEDIO client connected...") while self.cap.isOpened(): ret, frame = self.cap.read() sframe = cv2.resize(frame, (0,0), fx=self.fx, fy=self.fx) data = pickle.dumps(sframe) zdata = zlib.compress(data, zlib.Z_BEST_COMPRESSION) try: self.sock.sendall(struct.pack("L", len(zdata)) + zdata) except: break for i in range(self.interval): self.cap.read()
def _add_array_helper(self, data, array_type, prop_type): assert(isinstance(data, array.array)) assert(data.typecode == array_type) length = len(data) if _IS_BIG_ENDIAN: data = data[:] data.byteswap() data = data.tobytes() # mimic behavior of fbxconverter (also common sense) # we could make this configurable. encoding = 0 if len(data) <= 128 else 1 if encoding == 0: pass elif encoding == 1: data = zlib.compress(data, 1) comp_len = len(data) data = pack('<3I', length, encoding, comp_len) + data self.props_type.append(prop_type) self.props.append(data)
def push(self, data): if hasattr(data, 'size'): self.points_taken += data.size else: try: self.points_taken += len(data) except: try: junk = data + 1.0 self.points_taken += 1 except: raise ValueError("Got data {} that is neither an array nor a float".format(data)) if self.compression == 'zlib': message = {"type": "data", "compression": "zlib", "data": zlib.compress(pickle.dumps(data, -1))} else: message = {"type": "data", "compression": "none", "data": data} # This can be replaced with some other serialization method # and also should support sending via zmq. await self.queue.put(message)
def __init__(self, context, core_src): self._context = context self._present = {'mitogen': [ 'mitogen.ansible', 'mitogen.compat', 'mitogen.compat.pkgutil', 'mitogen.fakessh', 'mitogen.master', 'mitogen.ssh', 'mitogen.sudo', 'mitogen.utils', ]} self.tls = threading.local() self._cache = {} if core_src: self._cache['mitogen.core'] = ( None, 'mitogen/core.py', zlib.compress(core_src), )
def SaveModuleBP(): global codemap try: modname = AskStr('', 'module name : ') bpo = '' for e in Functions(): func = e.startEA length = e.endEA - e.startEA if length < codemap.func_min_size: continue offset = func - get_imagebase() bpo += str(offset) + '\n' print 'bp offset generation complete! ' + str(len(bpo)) payload = bpo with open(codemap.homedir + modname + '.bpo', 'wb') as f: f.write(zlib.compress(payload)) except: traceback.print_exc(file=sys.stdout)
def compress_zip(fd, # type: BinaryIO image, # type: np.ndarray depth, # type: int version # type: int ): # type: (...) -> None """ Write a Numpy array to a zip (zlib) compressed stream. {} """ image = normalize_image(image, depth) if util.needs_byteswap(image): compressor = zlib.compressobj() for row in image: row = util.do_byteswap(row) fd.write(compressor.compress(row)) fd.write(compressor.flush()) else: fd.write(zlib.compress(image))
def compress_constant_zip(fd, # type: BinaryIO value, # type: int width, # type: int rows, # type: int depth, # type: int version # type: int ): # type: (...) -> None """ Write a virtual image containing a constant to a zip compressed stream. {} """ if depth == 1: image = _make_onebit_constant(value, width, rows) compress_zip(fd, image, depth, version) else: row = _make_constant_row(value, width, depth) row = row.tobytes() fd.write(zlib.compress(row * rows))
def to_png(self, data, output): ''' Dump data to the image file. Data is bytes(RGBRGB...RGB). Pure python PNG implementation. http://inaps.org/journal/comment-fonctionne-le-png ''' p__ = pack line = self.width * 3 png_filter = p__('>B', 0) scanlines = b''.join( [png_filter + data[y * line:y * line + line] for y in range(self.height)]) magic = p__('>8B', 137, 80, 78, 71, 13, 10, 26, 10) # Header: size, marker, data, CRC32 ihdr = [b'', b'IHDR', b'', b''] ihdr[2] = p__('>2I5B', self.width, self.height, 8, 2, 0, 0, 0) ihdr[3] = p__('>I', crc32(b''.join(ihdr[1:3])) & 0xffffffff) ihdr[0] = p__('>I', len(ihdr[2])) # Data: size, marker, data, CRC32 idat = [b'', b'IDAT', compress(scanlines), b''] idat[3] = p__('>I', crc32(b''.join(idat[1:3])) & 0xffffffff) idat[0] = p__('>I', len(idat[2])) # Footer: size, marker, None, CRC32 iend = [b'', b'IEND', b'', b''] iend[3] = p__('>I', crc32(iend[1]) & 0xffffffff) iend[0] = p__('>I', len(iend[2])) with open(output, 'wb') as fileh: fileh.write(magic) fileh.write(b''.join(ihdr)) fileh.write(b''.join(idat)) fileh.write(b''.join(iend)) return err = 'Error writing data to "{0}".'.format(output) raise ScreenshotError(err)
def compress(data): return zlib.compress(data)
def read_file(filename): filename_path = os.path.join('/etc/ceph', filename) if not os.path.exists(filename_path): json_exit("file not found: {}".format(filename_path), failed=True) if not os.access(filename_path, os.R_OK): json_exit("file not readable: {}".format(filename_path), failed=True) with open(filename_path, 'rb') as f: raw_data = f.read() return {'content': base64.b64encode(zlib.compress(raw_data)), 'sha1': hashlib.sha1(raw_data).hexdigest(), 'filename': filename}
def copy_to_host(module): compress = module.params.get('compress') dest = module.params.get('dest') mode = int(module.params.get('mode'), 0) sha1 = module.params.get('sha1') src = module.params.get('src') data = base64.b64decode(src) raw_data = zlib.decompress(data) if compress else data if sha1: if os.path.exists(dest): if os.access(dest, os.R_OK): with open(dest, 'rb') as f: if hashlib.sha1(f.read()).hexdigest() == sha1: module.exit_json(changed=False) else: module.exit_json(failed=True, changed=False, msg='file is not accessible: {}'.format(dest)) if sha1 != hashlib.sha1(raw_data).hexdigest(): module.exit_json(failed=True, changed=False, msg='sha1 sum does not match data') with os.fdopen(os.open(dest, os.O_WRONLY | os.O_CREAT, mode), 'wb') as f: f.write(raw_data) module.exit_json(changed=True)
def dumps(self, response, body=None): if body is None: body = response.read(decode_content=False) # NOTE: 99% sure this is dead code. I'm only leaving it # here b/c I don't have a test yet to prove # it. Basically, before using # `cachecontrol.filewrapper.CallbackFileWrapper`, # this made an effort to reset the file handle. The # `CallbackFileWrapper` short circuits this code by # setting the body as the content is consumed, the # result being a `body` argument is *always* passed # into cache_response, and in turn, # `Serializer.dump`. response._fp = io.BytesIO(body) data = { "response": { "body": _b64_encode_bytes(body), "headers": dict( (_b64_encode(k), _b64_encode(v)) for k, v in response.headers.items() ), "status": response.status, "version": response.version, "reason": _b64_encode_str(response.reason), "strict": response.strict, "decode_content": response.decode_content, }, } return zlib.compress( json.dumps( data, separators=(",", ":"), sort_keys=True, ).encode("utf8"), )
def set_internal(self, key, value): # type: (str, bytes) -> None value = zlib.compress(value, self.ZLIB_COMPRESSION_LEVEL) self.kvstore.put(key, value)
def encode_npz(subvol): """ This file format is unrelated to np.savez We are just saving as .npy and the compressing using zlib. The .npy format contains metadata indicating shape and dtype, instead of np.tobytes which doesn't contain any metadata. """ fileobj = io.BytesIO() if len(subvol.shape) == 3: subvol = np.expand_dims(subvol, 0) np.save(fileobj, subvol) cdz = zlib.compress(fileobj.getvalue()) return cdz
def add_itxt(self, key, value, lang="", tkey="", zip=False): """Appends an iTXt chunk. :param key: latin-1 encodable text key name :param value: value for this key :param lang: language code :param tkey: UTF-8 version of the key name :param zip: compression flag """ if not isinstance(key, bytes): key = key.encode("latin-1", "strict") if not isinstance(value, bytes): value = value.encode("utf-8", "strict") if not isinstance(lang, bytes): lang = lang.encode("utf-8", "strict") if not isinstance(tkey, bytes): tkey = tkey.encode("utf-8", "strict") if zip: self.add(b"iTXt", key + b"\0\x01\0" + lang + b"\0" + tkey + b"\0" + zlib.compress(value)) else: self.add(b"iTXt", key + b"\0\0\0" + lang + b"\0" + tkey + b"\0" + value)
def add_text(self, key, value, zip=0): """Appends a text chunk. :param key: latin-1 encodable text key name :param value: value for this key, text or an :py:class:`PIL.PngImagePlugin.iTXt` instance :param zip: compression flag """ if isinstance(value, iTXt): return self.add_itxt(key, value, value.lang, value.tkey, bool(zip)) # The tEXt chunk stores latin-1 text if not isinstance(value, bytes): try: value = value.encode('latin-1', 'strict') except UnicodeError: return self.add_itxt(key, value, zip=bool(zip)) if not isinstance(key, bytes): key = key.encode('latin-1', 'strict') if zip: self.add(b"zTXt", key + b"\0\0" + zlib.compress(value)) else: self.add(b"tEXt", key + b"\0" + value) # -------------------------------------------------------------------- # PNG image stream (IHDR/IEND)
def compress(data, level=ZLIB_COMPRESSION_LEVEL): """ Compress 'data' to Zlib format. If 'USE_ZOPFLI' variable is True, zopfli is used instead of the zlib module. The compression 'level' must be between 0 and 9. 1 gives best speed, 9 gives best compression (0 gives no compression at all). The default value is a compromise between speed and compression (6). """ if not (0 <= level <= 9): raise ValueError('Bad compression level: %s' % level) if not USE_ZOPFLI or level == 0: from zlib import compress return compress(data, level) else: from zopfli.zlib import compress return compress(data, numiterations=ZOPFLI_LEVELS[level])
def encodeData(self, data): self.origLength = len(data) if not self.uncompressed: compressedData = compress(data, self.zlibCompressionLevel) if self.uncompressed or len(compressedData) >= self.origLength: # Encode uncompressed rawData = data self.length = self.origLength else: rawData = compressedData self.length = len(rawData) return rawData
def picklechops(chops): """Pickles and base64encodes it's argument chops""" value = zlib.compress(dumps(chops)) encoded = base64.encodestring(value) return encoded.strip()
def surfaceData(srf, compress=zlib.compress): "Convert surface to bytes data with optional compression" if not isinstance(srf, pygame.Surface): srf = srf.image w, h = srf.get_size() a = hasAlpha(srf) mode = (1 if a else 0) + (2 if compress else 0) mode = struct.pack("!3I", mode, w, h) data = pygame.image.tostring(srf, "RGBA" if a else "RGB") return (compress(data) if compress else data), mode
def testExtraPixels(self): """Test file that contains too many pixels.""" def eachchunk(chunk): if chunk[0] != 'IDAT': return chunk data = zlib.decompress(chunk[1]) data += strtobytes('\x00garbage') data = zlib.compress(data) chunk = (chunk[0], data) return chunk self.assertRaises(FormatError, self.helperFormat, eachchunk)
def testNotEnoughPixels(self): def eachchunk(chunk): if chunk[0] != 'IDAT': return chunk # Remove last byte. data = zlib.decompress(chunk[1]) data = data[:-1] data = zlib.compress(data) return (chunk[0], data) self.assertRaises(FormatError, self.helperFormat, eachchunk)
def testBadFilter(self): def eachchunk(chunk): if chunk[0] != 'IDAT': return chunk data = zlib.decompress(chunk[1]) # Corrupt the first filter byte data = strtobytes('\x99') + data[1:] data = zlib.compress(data) return (chunk[0], data) self.assertRaises(FormatError, self.helperFormat, eachchunk)
def dump_payload(self, obj): json = super(URLSafeSerializerMixin, self).dump_payload(obj) is_compressed = False compressed = zlib.compress(json) if len(compressed) < (len(json) - 1): json = compressed is_compressed = True base64d = base64_encode(json) if is_compressed: base64d = b'.' + base64d return base64d
def _save(path, obj): "Save an object to the specified path." data = zlib.compress(pickletools.optimize(pickle.dumps(obj)), 9) with open(path, 'wb') as file: file.write(data)
def quit_game(event=None): # Save HST and quit program. file(HST_FILE, 'wb').write(zlib.compress(repr(HS_database), 9)) root.quit() ################################################################################ # HST PREPARATION FUNCTIONS
def asciiCompress(data, level=9): """ compress data to printable ascii-code """ code = zlib.compress(data,level) csum = zlib.crc32(code) code = base64.encodestring(code) return code, csum
def bz2_pack(source): "Returns 'source' as a bzip2-compressed, self-extracting python script." import bz2, base64 out = "" compressed_source = bz2.compress(source) out += 'import bz2, base64\n' out += "exec bz2.decompress(base64.b64decode('" out += base64.b64encode((compressed_source)) out += "'))\n" return out
def gz_pack(source): "Returns 'source' as a gzip-compressed, self-extracting python script." import zlib, base64 out = "" compressed_source = zlib.compress(source) out += 'import zlib, base64\n' out += "exec zlib.decompress(base64.b64decode('" out += base64.b64encode((compressed_source)) out += "'))\n" return out # The test.+() functions below are for testing pyminifer...
def load(cls, path): # Loads programs and handles optimized files. ws = path + '.ws' cp = path + '.wso' compiled = False if os.path.isfile(cp): compiled = True if os.path.isfile(ws): if os.path.getmtime(ws) > os.path.getmtime(cp): compiled = False final = cls._final() cls._check(final) if compiled: try: with open(cp, 'rb') as file: code = file.read(len(final)) cls._check(code) data = file.read() return cls(pickle.loads(zlib.decompress(data))) except: pass data = load(ws) code = trinary(data) program = parse(code) serialized = pickle.dumps(program, pickle.HIGHEST_PROTOCOL) optimized = zlib.compress(serialized, 9) with open(cp, 'wb') as file: file.write(final + optimized) return cls(program)