我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用lzma.LZMACompressor()。
def test_simple_bad_args(self): self.assertRaises(TypeError, LZMACompressor, []) self.assertRaises(TypeError, LZMACompressor, format=3.45) self.assertRaises(TypeError, LZMACompressor, check="") self.assertRaises(TypeError, LZMACompressor, preset="asdf") self.assertRaises(TypeError, LZMACompressor, filters=3) # Can't specify FORMAT_AUTO when compressing. self.assertRaises(ValueError, LZMACompressor, format=lzma.FORMAT_AUTO) # Can't specify a preset and a custom filter chain at the same time. with self.assertRaises(ValueError): LZMACompressor(preset=7, filters=[{"id": lzma.FILTER_LZMA2}]) self.assertRaises(TypeError, LZMADecompressor, ()) self.assertRaises(TypeError, LZMADecompressor, memlimit=b"qw") with self.assertRaises(TypeError): LZMADecompressor(lzma.FORMAT_RAW, filters="zzz") # Cannot specify a memory limit with FILTER_RAW. with self.assertRaises(ValueError): LZMADecompressor(lzma.FORMAT_RAW, memlimit=0x1000000) # Can only specify a custom filter chain with FILTER_RAW. self.assertRaises(ValueError, LZMADecompressor, filters=FILTERS_RAW_1) with self.assertRaises(ValueError): LZMADecompressor(format=lzma.FORMAT_XZ, filters=FILTERS_RAW_1) with self.assertRaises(ValueError): LZMADecompressor(format=lzma.FORMAT_ALONE, filters=FILTERS_RAW_1) lzc = LZMACompressor() self.assertRaises(TypeError, lzc.compress) self.assertRaises(TypeError, lzc.compress, b"foo", b"bar") self.assertRaises(TypeError, lzc.flush, b"blah") empty = lzc.flush() self.assertRaises(ValueError, lzc.compress, b"quux") self.assertRaises(ValueError, lzc.flush) lzd = LZMADecompressor() self.assertRaises(TypeError, lzd.decompress) self.assertRaises(TypeError, lzd.decompress, b"foo", b"bar") lzd.decompress(empty) self.assertRaises(EOFError, lzd.decompress, b"quux")
def test_bad_filter_spec(self): self.assertRaises(TypeError, LZMACompressor, filters=[b"wobsite"]) self.assertRaises(ValueError, LZMACompressor, filters=[{"xyzzy": 3}]) self.assertRaises(ValueError, LZMACompressor, filters=[{"id": 98765}]) with self.assertRaises(ValueError): LZMACompressor(filters=[{"id": lzma.FILTER_LZMA2, "foo": 0}]) with self.assertRaises(ValueError): LZMACompressor(filters=[{"id": lzma.FILTER_DELTA, "foo": 0}]) with self.assertRaises(ValueError): LZMACompressor(filters=[{"id": lzma.FILTER_X86, "foo": 0}])
def test_decompressor_bad_input(self): lzd = LZMADecompressor() self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_RAW_1) lzd = LZMADecompressor(lzma.FORMAT_XZ) self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_ALONE) lzd = LZMADecompressor(lzma.FORMAT_ALONE) self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ) lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_1) self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ) # Test that LZMACompressor->LZMADecompressor preserves the input data.
def test_roundtrip_xz(self): lzc = LZMACompressor() cdata = lzc.compress(INPUT) + lzc.flush() lzd = LZMADecompressor() self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64)
def test_roundtrip_alone(self): lzc = LZMACompressor(lzma.FORMAT_ALONE) cdata = lzc.compress(INPUT) + lzc.flush() lzd = LZMADecompressor() self._test_decompressor(lzd, cdata, lzma.CHECK_NONE)
def test_roundtrip_chunks(self): lzc = LZMACompressor() cdata = [] for i in range(0, len(INPUT), 10): cdata.append(lzc.compress(INPUT[i:i+10])) cdata.append(lzc.flush()) cdata = b"".join(cdata) lzd = LZMADecompressor() self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64) # LZMADecompressor intentionally does not handle concatenated streams.
def test_compressor_bigmem(self, size): lzc = LZMACompressor() cdata = lzc.compress(b"x" * size) + lzc.flush() ddata = lzma.decompress(cdata) try: self.assertEqual(len(ddata), size) self.assertEqual(len(ddata.strip(b"x")), 0) finally: ddata = None
def test_pickle(self): for proto in range(pickle.HIGHEST_PROTOCOL + 1): with self.assertRaises(TypeError): pickle.dumps(LZMACompressor(), proto) with self.assertRaises(TypeError): pickle.dumps(LZMADecompressor(), proto)
def test_pickle(self): with self.assertRaises(TypeError): pickle.dumps(LZMACompressor()) with self.assertRaises(TypeError): pickle.dumps(LZMADecompressor())
def __init__(self, algorithm, target_file = None): self.algorithm = algorithm self.bytes_in = 0 self.bytes_out = 0 if self.algorithm is None: self.compressor = None elif self.algorithm == "lzma": if not lzma_available: raise BadCompressionException("LZMA (xz) compressor not available.") self.compressor = lzma.LZMACompressor() else: raise BadCompressionException("Uknown compression algorithm '%s'." % self.algorithm) self.out_file = open(target_file, "wb") \ if target_file is not None else None
def __init__(self, name, mode, comptype, fileobj, bufsize): """Construct a _Stream object. """ self._extfileobj = True if fileobj is None: fileobj = _LowLevelFile(name, mode) self._extfileobj = False if comptype == '*': # Enable transparent compression detection for the # stream interface fileobj = _StreamProxy(fileobj) comptype = fileobj.getcomptype() self.name = name or "" self.mode = mode self.comptype = comptype self.fileobj = fileobj self.bufsize = bufsize self.buf = "" self.pos = 0 self.closed = False if comptype == "gz": try: import zlib except ImportError: raise CompressionError("zlib module is not available") self.zlib = zlib self.crc = zlib.crc32("") if mode == "r": self._init_read_gz() else: self._init_write_gz() if comptype == "bz2": try: import bz2 except ImportError: raise CompressionError("bz2 module is not available") if mode == "r": self.dbuf = "" self.cmp = bz2.BZ2Decompressor() else: self.cmp = bz2.BZ2Compressor() if comptype == "xz": try: import lzma except ImportError: raise CompressionError("lzma module is not available") if mode == "r": self.dbuf = "" self.cmp = lzma.LZMADecompressor() else: self.cmp = lzma.LZMACompressor()