我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zlib.decompress()。
def run(self): print("VEDIO server starts...") self.sock.bind(self.ADDR) self.sock.listen(1) conn, addr = self.sock.accept() print("remote VEDIO client success connected...") data = "".encode("utf-8") payload_size = struct.calcsize("L") cv2.namedWindow('Remote', cv2.WINDOW_NORMAL) while True: while len(data) < payload_size: data += conn.recv(81920) packed_size = data[:payload_size] data = data[payload_size:] msg_size = struct.unpack("L", packed_size)[0] while len(data) < msg_size: data += conn.recv(81920) zframe_data = data[:msg_size] data = data[msg_size:] frame_data = zlib.decompress(zframe_data) frame = pickle.loads(frame_data) cv2.imshow('Remote', frame) if cv2.waitKey(1) & 0xFF == 27: break
def _loads_v2(self, request, data): try: cached = json.loads(zlib.decompress(data).decode("utf8")) except ValueError: return # We need to decode the items that we've base64 encoded cached["response"]["body"] = _b64_decode_bytes( cached["response"]["body"] ) cached["response"]["headers"] = dict( (_b64_decode_str(k), _b64_decode_str(v)) for k, v in cached["response"]["headers"].items() ) cached["response"]["reason"] = _b64_decode_str( cached["response"]["reason"], ) cached["vary"] = dict( (_b64_decode_str(k), _b64_decode_str(v) if v is not None else v) for k, v in cached["vary"].items() ) return self.prepare_response(request, cached)
def _loads_v2(self, data): try: cached = json.loads(zlib.decompress(data).decode("utf8")) except ValueError: return # We need to decode the items that we've base64 encoded cached["response"]["body"] = _b64_decode_bytes( cached["response"]["body"] ) cached["response"]["headers"] = dict( (_b64_decode_str(k), _b64_decode_str(v)) for k, v in cached["response"]["headers"].items() ) cached["response"]["reason"] = _b64_decode_str( cached["response"]["reason"], ) return self.prepare_response(cached)
def __init__(self, reader=None): self.majorVersion = None self.minorVersion = None self.metaData = None self.privData = None if reader: self.majorVersion = reader.majorVersion self.minorVersion = reader.minorVersion if reader.metaLength: reader.file.seek(reader.metaOffset) rawData = reader.file.read(reader.metaLength) assert len(rawData) == reader.metaLength import zlib data = zlib.decompress(rawData) assert len(data) == reader.metaOrigLength self.metaData = data if reader.privLength: reader.file.seek(reader.privOffset) data = reader.file.read(reader.privLength) assert len(data) == reader.privLength self.privData = data
def _decompressContent(response, new_content): content = new_content try: encoding = response.get('content-encoding', None) if encoding in ['gzip', 'deflate']: if encoding == 'gzip': content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read() if encoding == 'deflate': content = zlib.decompress(content) response['content-length'] = str(len(content)) # Record the historical presence of the encoding in a way the won't interfere. response['-content-encoding'] = response['content-encoding'] del response['content-encoding'] except IOError: content = "" raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content) return content
def __init__(self, srf, bg=None, decompress=zlib.decompress): if hasattr(srf, "image"): srf = srf.image t = type(srf) if t is str: srf = pygame.image.load(srf) if bg: srf = style(srf, bg) elif not hasAlpha(srf): srf = srf.convert_alpha() elif t is bytes: mode, w, h = struct.unpack("!3I", bg) if mode & 2: srf = decompress(srf) mode = "RGBA" if mode & 1 else "RGB" srf = pygame.image.fromstring(srf, (w,h), mode) elif t in (list, tuple): srf = pygame.Surface(srf, pygame.SRCALPHA) if bg: srf.fill(bg if type(bg) is pygame.Color else rgba(bg)) self.original = srf self.dumpCache()
def load_payload(self, payload): decompress = False if payload.startswith(b'.'): payload = payload[1:] decompress = True try: json = base64_decode(payload) except Exception as e: raise BadPayload('Could not base64 decode the payload because of ' 'an exception', original_error=e) if decompress: try: json = zlib.decompress(json) except Exception as e: raise BadPayload('Could not zlib decompress the payload before ' 'decoding the payload', original_error=e) return super(URLSafeSerializerMixin, self).load_payload(json)
def _loads_v2(self, request, data): try: cached = json.loads(zlib.decompress(data).decode("utf8")) except (ValueError, zlib.error): return # We need to decode the items that we've base64 encoded cached["response"]["body"] = _b64_decode_bytes( cached["response"]["body"] ) cached["response"]["headers"] = dict( (_b64_decode_str(k), _b64_decode_str(v)) for k, v in cached["response"]["headers"].items() ) cached["response"]["reason"] = _b64_decode_str( cached["response"]["reason"], ) cached["vary"] = dict( (_b64_decode_str(k), _b64_decode_str(v) if v is not None else v) for k, v in cached["vary"].items() ) return self.prepare_response(request, cached)
def load_HST(): # Load (H)igh (S)core (T)able. try: database = eval(zlib.decompress(file(HST_FILE, 'rb').read())) names = 0 records = sum(map(len, SAMPLE_HST.values())) for key in database.keys(): assert isinstance(key, int) assert isinstance(database[key], list) for name in database[key]: assert isinstance(name, str) names += 1 assert names <= records assert names == records return database except: return SAMPLE_HST
def asciiDecompress(code): """ decompress result of asciiCompress """ code = base64.decodestring(code) csum = zlib.crc32(code) data = zlib.decompress(code) return data, csum
def client_post_decrypt(self, buf): if self.raw_trans: return buf self.recv_buf += buf out_buf = b'' while len(self.recv_buf) > 2: length = struct.unpack('>H', self.recv_buf[:2])[0] if length >= 32768 or length < 6: self.raw_trans = True self.recv_buf = b'' raise Exception('client_post_decrypt data error') if length > len(self.recv_buf): break out_buf += zlib.decompress(b'x\x9c' + self.recv_buf[2:length]) self.recv_buf = self.recv_buf[length:] if out_buf: self.decrypt_packet_num += 1 return out_buf
def server_post_decrypt(self, buf): if self.raw_trans: return (buf, False) self.recv_buf += buf out_buf = b'' while len(self.recv_buf) > 2: length = struct.unpack('>H', self.recv_buf[:2])[0] if length >= 32768 or length < 6: self.raw_trans = True self.recv_buf = b'' if self.decrypt_packet_num == 0: return (b'E'*2048, False) else: raise Exception('server_post_decrype data error') if length > len(self.recv_buf): break out_buf += zlib.decompress(b'\x78\x9c' + self.recv_buf[2:length]) self.recv_buf = self.recv_buf[length:] if out_buf: self.decrypt_packet_num += 1 return (out_buf, False)
def download_sifts_xml(pdb_id, outdir='', outfile=''): """Download the SIFTS file for a PDB ID. Args: pdb_id: outdir: outfile: Returns: """ baseURL = 'ftp://ftp.ebi.ac.uk/pub/databases/msd/sifts/xml/' filename = '{}.xml.gz'.format(pdb_id) if outfile: outfile = op.join(outdir, outfile) else: outfile = op.join(outdir, filename.split('.')[0] + '.sifts.xml') if not op.exists(outfile): response = urlopen(baseURL + filename) with open(outfile, 'wb') as f: f.write(gzip.decompress(response.read())) return outfile
def send(self, *args, **kwargs): # In raven<6, args = (data, headers). # In raven 6.x args = (url, data, headers) if len(args) == 2: data, _ = args elif len(args) == 3: _, data, _ = args else: raise Exception('raven Transport.send api seems to have changed') raw = json.loads(zlib.decompress(data).decode('utf8')) # to make asserting easier, parse json strings into python strings for k, v in list(raw['extra'].items()): try: val = ast.literal_eval(v) raw['extra'][k] = val except Exception: pass self.messages.append(raw)
def loads(s, key=None, salt='django.core.signing', serializer=JSONSerializer, max_age=None): """ Reverse of dumps(), raises BadSignature if signature fails. The serializer is expected to accept a bytestring. """ # TimestampSigner.unsign always returns unicode but base64 and zlib # compression operate on bytes. base64d = force_bytes(TimestampSigner(key, salt=salt).unsign(s, max_age=max_age)) decompress = False if base64d[:1] == b'.': # It's compressed; uncompress it first base64d = base64d[1:] decompress = True data = b64_decode(base64d) if decompress: data = zlib.decompress(data) return serializer().loads(data)
def retrieve_content(url, data=None): """ Retrieves page content from given URL """ try: req = urllib2.Request("".join(url[i].replace(' ', "%20") if i > url.find('?') else url[i] for i in xrange(len(url))), data, {"User-agent": NAME, "Accept-encoding": "gzip, deflate"}) resp = urllib2.urlopen(req, timeout=TIMEOUT) retval = resp.read() encoding = resp.headers.get("Content-Encoding") if encoding: if encoding.lower() == "deflate": data = StringIO.StringIO(zlib.decompress(retval, -15)) else: data = gzip.GzipFile("", "rb", 9, StringIO.StringIO(retval)) retval = data.read() except Exception, ex: retval = ex.read() if hasattr(ex, "read") else getattr(ex, "msg", str()) return retval or ""
def _unpack(self): with self._unpackLock: if hasattr(self, '_pwbuffer'): pwbuffer = zlib.decompress(self._pwbuffer) pwbuffer = pwbuffer.split(self._delimiter) assert len(pwbuffer) == self._numElems md = hashlib.md5() md.update(self.essid) if self._magic == 'PYR2': md.update(self._pmkbuffer) md.update(self._pwbuffer) else: md.update(self._pmkbuffer) md.update(''.join(pwbuffer)) if md.digest() != self._digest: raise DigestError("Digest check failed") self.results = zip(pwbuffer, util.grouper(self._pmkbuffer, 32)) assert len(self.results) == self._numElems del self._pwbuffer del self._digest del self._magic del self._delimiter
def test_binary_string(self): # Binary strings should be cacheable cache = self.cache from zlib import compress, decompress value = 'value_to_be_compressed' compressed_value = compress(value.encode()) # Test set cache.set('binary1', compressed_value) compressed_result = cache.get('binary1') self.assertEqual(compressed_value, compressed_result) self.assertEqual(value, decompress(compressed_result).decode()) # Test add cache.add('binary1-add', compressed_value) compressed_result = cache.get('binary1-add') self.assertEqual(compressed_value, compressed_result) self.assertEqual(value, decompress(compressed_result).decode()) # Test set_many cache.set_many({'binary1-set_many': compressed_value}) compressed_result = cache.get('binary1-set_many') self.assertEqual(compressed_value, compressed_result) self.assertEqual(value, decompress(compressed_result).decode())
def client_post_decrypt(self, buf): if self.raw_trans: return buf self.recv_buf += buf out_buf = b'' while len(self.recv_buf) > 2: length = struct.unpack('>H', self.recv_buf[:2])[0] if length >= 32768: self.raw_trans = True self.recv_buf = b'' if self.decrypt_packet_num == 0: return None else: raise Exception('server_post_decrype data error') if length > len(self.recv_buf): break out_buf += zlib.decompress(b'x\x9c' + self.recv_buf[2:length]) self.recv_buf = self.recv_buf[length:] if out_buf: self.decrypt_packet_num += 1 return out_buf
def server_post_decrypt(self, buf): if self.raw_trans: return buf self.recv_buf += buf out_buf = b'' while len(self.recv_buf) > 2: length = struct.unpack('>H', self.recv_buf[:2])[0] if length >= 32768: self.raw_trans = True self.recv_buf = b'' if self.decrypt_packet_num == 0: return None else: raise Exception('server_post_decrype data error') if length > len(self.recv_buf): break out_buf += zlib.decompress(b'\x78\x9c' + self.recv_buf[2:length]) self.recv_buf = self.recv_buf[length:] if out_buf: self.decrypt_packet_num += 1 return out_buf
def compressedField(field): # Decorator for compressed fields: def fget(self): data = getattr(self, field) if data is None: return None return zlib.decompress(data) def fset(self, value): setattr(self, field, zlib.compress(value.encode())) def fdel(self): delattr(self, field) return {'doc': "The compression property for %s." % field, 'fget': fget, 'fset': fset, 'fdel': fdel}
def _decompressContent(response, new_content): content = new_content try: encoding = response.get('content-encoding', None) if encoding in ['gzip', 'deflate']: if encoding == 'gzip': content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read() if encoding == 'deflate': content = zlib.decompress(content, -zlib.MAX_WBITS) response['content-length'] = str(len(content)) # Record the historical presence of the encoding in a way the won't interfere. response['-content-encoding'] = response['content-encoding'] del response['content-encoding'] except IOError: content = "" raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content) return content
def _decompressContent(response, new_content): content = new_content try: encoding = response.get('content-encoding', None) if encoding in ['gzip', 'deflate']: if encoding == 'gzip': content = gzip.GzipFile(fileobj=io.BytesIO(new_content)).read() if encoding == 'deflate': content = zlib.decompress(content, -zlib.MAX_WBITS) response['content-length'] = str(len(content)) # Record the historical presence of the encoding in a way the won't interfere. response['-content-encoding'] = response['content-encoding'] del response['content-encoding'] except IOError: content = "" raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content) return content
def secure_loads(data, encryption_key, hash_key=None, compression_level=None): components = data.count(b':') if components == 1: return secure_loads_deprecated(data, encryption_key, hash_key, compression_level) if components != 2: return None version, signature, encrypted_data = data.split(b':', 2) if version != b'hmac256': return None encryption_key = to_bytes(encryption_key) if not hash_key: hash_key = hashlib.sha256(encryption_key).digest() actual_signature = hmac.new(to_bytes(hash_key), encrypted_data, hashlib.sha256).hexdigest() if not compare(to_native(signature), actual_signature): return None encrypted_data = base64.urlsafe_b64decode(encrypted_data) IV, encrypted_data = encrypted_data[:16], encrypted_data[16:] cipher, _ = AES_new(pad(encryption_key)[:32], IV=IV) try: data = unpad(AES_dec(cipher, encrypted_data)) if compression_level: data = zlib.decompress(data) return pickle.loads(data) except Exception as e: return None