我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用lzma.LZMADecompressor()。
def _decompress(self, chunk_size=32000): """Decompresses the cluster if compression flag was found. Stores uncompressed results internally.""" if not self.compressed: return self.file_buf.seek(self.ptr + 1) # Store uncompressed cluster data for use as uncompressed data self.uncomp_buf = BytesIO() decomp = lzma.LZMADecompressor() while not decomp.eof: comp_data = self.file_buf.read(chunk_size) uncomp_data = decomp.decompress(comp_data) self.uncomp_buf.write(uncomp_data) return self.uncomp_buf
def _decompress(self, chunk_size=32000): """Decompresses the cluster if compression flag was found. Stores uncompressed results internally.""" if not self.compressed: return self.file_buf.seek(self.ptr + 1) # Store uncompressed cluster data for use as uncompressed data self.uncomp_buf = StringIO() decomp = lzma.LZMADecompressor() while not decomp.eof: comp_data = self.file_buf.read(chunk_size) uncomp_data = decomp.decompress(comp_data) self.uncomp_buf.write(uncomp_data) return self.uncomp_buf
def test_roundtrip(self): cdata = lzma.compress(INPUT) ddata = lzma.decompress(cdata) self.assertEqual(ddata, INPUT) cdata = lzma.compress(INPUT, lzma.FORMAT_XZ) ddata = lzma.decompress(cdata) self.assertEqual(ddata, INPUT) cdata = lzma.compress(INPUT, lzma.FORMAT_ALONE) ddata = lzma.decompress(cdata) self.assertEqual(ddata, INPUT) cdata = lzma.compress(INPUT, lzma.FORMAT_RAW, filters=FILTERS_RAW_4) ddata = lzma.decompress(cdata, lzma.FORMAT_RAW, filters=FILTERS_RAW_4) self.assertEqual(ddata, INPUT) # Unlike LZMADecompressor, decompress() *does* handle concatenated streams.
def decompress_lzma(data): results = [] len(data) while True: decomp = LZMADecompressor(FORMAT_AUTO, None, None) try: res = decomp.decompress(data) except LZMAError: if results: break # Leftover data is not a valid LZMA/XZ stream; ignore it. else: raise # Error on the first iteration; bail out. results.append(res) data = decomp.unused_data if not data: break if not decomp.eof: raise LZMAError("Compressed data ended before the end-of-stream marker was reached") return b"".join(results)
def decompress(self, buf): if not self.compressed: return buf ty = self.compression_type if ty == CompressionType.LZMA: props, dict_size = struct.unpack("<BI", buf.read(5)) lc = props % 9 props = int(props / 9) pb = int(props / 5) lp = props % 5 dec = lzma.LZMADecompressor(format=lzma.FORMAT_RAW, filters=[{ "id": lzma.FILTER_LZMA1, "dict_size": dict_size, "lc": lc, "lp": lp, "pb": pb, }]) res = dec.decompress(buf.read()) return BytesIO(res) if ty in (CompressionType.LZ4, CompressionType.LZ4HC): res = lz4_decompress(buf.read(self.compressed_size), self.uncompressed_size) return BytesIO(res) raise NotImplementedError("Unimplemented compression method: %r" % (ty))
def test_simple_bad_args(self): self.assertRaises(TypeError, LZMACompressor, []) self.assertRaises(TypeError, LZMACompressor, format=3.45) self.assertRaises(TypeError, LZMACompressor, check="") self.assertRaises(TypeError, LZMACompressor, preset="asdf") self.assertRaises(TypeError, LZMACompressor, filters=3) # Can't specify FORMAT_AUTO when compressing. self.assertRaises(ValueError, LZMACompressor, format=lzma.FORMAT_AUTO) # Can't specify a preset and a custom filter chain at the same time. with self.assertRaises(ValueError): LZMACompressor(preset=7, filters=[{"id": lzma.FILTER_LZMA2}]) self.assertRaises(TypeError, LZMADecompressor, ()) self.assertRaises(TypeError, LZMADecompressor, memlimit=b"qw") with self.assertRaises(TypeError): LZMADecompressor(lzma.FORMAT_RAW, filters="zzz") # Cannot specify a memory limit with FILTER_RAW. with self.assertRaises(ValueError): LZMADecompressor(lzma.FORMAT_RAW, memlimit=0x1000000) # Can only specify a custom filter chain with FILTER_RAW. self.assertRaises(ValueError, LZMADecompressor, filters=FILTERS_RAW_1) with self.assertRaises(ValueError): LZMADecompressor(format=lzma.FORMAT_XZ, filters=FILTERS_RAW_1) with self.assertRaises(ValueError): LZMADecompressor(format=lzma.FORMAT_ALONE, filters=FILTERS_RAW_1) lzc = LZMACompressor() self.assertRaises(TypeError, lzc.compress) self.assertRaises(TypeError, lzc.compress, b"foo", b"bar") self.assertRaises(TypeError, lzc.flush, b"blah") empty = lzc.flush() self.assertRaises(ValueError, lzc.compress, b"quux") self.assertRaises(ValueError, lzc.flush) lzd = LZMADecompressor() self.assertRaises(TypeError, lzd.decompress) self.assertRaises(TypeError, lzd.decompress, b"foo", b"bar") lzd.decompress(empty) self.assertRaises(EOFError, lzd.decompress, b"quux")
def test_decompressor_after_eof(self): lzd = LZMADecompressor() lzd.decompress(COMPRESSED_XZ) self.assertRaises(EOFError, lzd.decompress, b"nyan")
def test_decompressor_memlimit(self): lzd = LZMADecompressor(memlimit=1024) self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ) lzd = LZMADecompressor(lzma.FORMAT_XZ, memlimit=1024) self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ) lzd = LZMADecompressor(lzma.FORMAT_ALONE, memlimit=1024) self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_ALONE) # Test LZMADecompressor on known-good input data.
def test_decompressor_auto(self): lzd = LZMADecompressor() self._test_decompressor(lzd, COMPRESSED_XZ, lzma.CHECK_CRC64) lzd = LZMADecompressor() self._test_decompressor(lzd, COMPRESSED_ALONE, lzma.CHECK_NONE)
def test_decompressor_xz(self): lzd = LZMADecompressor(lzma.FORMAT_XZ) self._test_decompressor(lzd, COMPRESSED_XZ, lzma.CHECK_CRC64)
def test_decompressor_raw_1(self): lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_1) self._test_decompressor(lzd, COMPRESSED_RAW_1, lzma.CHECK_NONE)
def test_decompressor_raw_2(self): lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_2) self._test_decompressor(lzd, COMPRESSED_RAW_2, lzma.CHECK_NONE)
def test_decompressor_raw_3(self): lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_3) self._test_decompressor(lzd, COMPRESSED_RAW_3, lzma.CHECK_NONE)
def test_decompressor_raw_4(self): lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_4) self._test_decompressor(lzd, COMPRESSED_RAW_4, lzma.CHECK_NONE)
def test_decompressor_chunks(self): lzd = LZMADecompressor() out = [] for i in range(0, len(COMPRESSED_XZ), 10): self.assertFalse(lzd.eof) out.append(lzd.decompress(COMPRESSED_XZ[i:i+10])) out = b"".join(out) self.assertEqual(out, INPUT) self.assertEqual(lzd.check, lzma.CHECK_CRC64) self.assertTrue(lzd.eof) self.assertEqual(lzd.unused_data, b"")
def test_decompressor_bad_input(self): lzd = LZMADecompressor() self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_RAW_1) lzd = LZMADecompressor(lzma.FORMAT_XZ) self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_ALONE) lzd = LZMADecompressor(lzma.FORMAT_ALONE) self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ) lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_1) self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ) # Test that LZMACompressor->LZMADecompressor preserves the input data.
def test_roundtrip_xz(self): lzc = LZMACompressor() cdata = lzc.compress(INPUT) + lzc.flush() lzd = LZMADecompressor() self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64)
def test_roundtrip_alone(self): lzc = LZMACompressor(lzma.FORMAT_ALONE) cdata = lzc.compress(INPUT) + lzc.flush() lzd = LZMADecompressor() self._test_decompressor(lzd, cdata, lzma.CHECK_NONE)
def test_roundtrip_raw(self): lzc = LZMACompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_4) cdata = lzc.compress(INPUT) + lzc.flush() lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_4) self._test_decompressor(lzd, cdata, lzma.CHECK_NONE)
def test_roundtrip_chunks(self): lzc = LZMACompressor() cdata = [] for i in range(0, len(INPUT), 10): cdata.append(lzc.compress(INPUT[i:i+10])) cdata.append(lzc.flush()) cdata = b"".join(cdata) lzd = LZMADecompressor() self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64) # LZMADecompressor intentionally does not handle concatenated streams.
def test_decompressor_bigmem(self, size): lzd = LZMADecompressor() blocksize = 10 * 1024 * 1024 block = random.getrandbits(blocksize * 8).to_bytes(blocksize, "little") try: input = block * (size // blocksize + 1) cdata = lzma.compress(input) ddata = lzd.decompress(cdata) self.assertEqual(ddata, input) finally: input = cdata = ddata = None
def test_decompress_memlimit(self): with self.assertRaises(LZMAError): lzma.decompress(COMPRESSED_XZ, memlimit=1024) with self.assertRaises(LZMAError): lzma.decompress( COMPRESSED_XZ, format=lzma.FORMAT_XZ, memlimit=1024) with self.assertRaises(LZMAError): lzma.decompress( COMPRESSED_ALONE, format=lzma.FORMAT_ALONE, memlimit=1024) # Test LZMADecompressor on known-good input data.
def test_decompressor_bigmem(self, size): lzd = LZMADecompressor() blocksize = 10 * 1024 * 1024 block = random.getrandbits(blocksize * 8).to_bytes(blocksize, "little") try: input = block * (size // blocksize + 1) cdata = lzma.compress(input) ddata = lzd.decompress(cdata) self.assertEqual(ddata, input) finally: input = cdata = ddata = None # Pickling raises an exception; there's no way to serialize an lzma_stream.
def test_pickle(self): for proto in range(pickle.HIGHEST_PROTOCOL + 1): with self.assertRaises(TypeError): pickle.dumps(LZMACompressor(), proto) with self.assertRaises(TypeError): pickle.dumps(LZMADecompressor(), proto)