public Decompressor getDecompressor() throws IOException { CompressionCodec codec = getCodec(); if (codec != null) { Decompressor decompressor = CodecPool.getDecompressor(codec); if (decompressor != null) { if (decompressor.finished()) { // Somebody returns the decompressor to CodecPool but is still using // it. LOG.warn("Deompressor obtained from CodecPool already finished()"); } else { if(LOG.isDebugEnabled()) { LOG.debug("Got a decompressor: " + decompressor.hashCode()); } } /** * Following statement is necessary to get around bugs in 0.18 where a * decompressor is referenced after returned back to the codec pool. */ decompressor.reset(); } return decompressor; } return null; }
@Test public void testZlibCompressorDecompressorWithConfiguration() { Configuration conf = new Configuration(); if (ZlibFactory.isNativeZlibLoaded(conf)) { byte[] rawData; int tryNumber = 5; int BYTE_SIZE = 10 * 1024; Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf); Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf); rawData = generate(BYTE_SIZE); try { for (int i = 0; i < tryNumber; i++) compressDecompressZlib(rawData, (ZlibCompressor) zlibCompressor, (ZlibDecompressor) zlibDecompressor); zlibCompressor.reinit(conf); } catch (Exception ex) { fail("testZlibCompressorDecompressorWithConfiguration ex error " + ex); } } else { assertTrue("ZlibFactory is using native libs against request", ZlibFactory.isNativeZlibLoaded(conf)); } }
@Test public void testZlibCompressorDecompressorWithCompressionLevels() { Configuration conf = new Configuration(); conf.set("zlib.compress.level","FOUR"); if (ZlibFactory.isNativeZlibLoaded(conf)) { byte[] rawData; int tryNumber = 5; int BYTE_SIZE = 10 * 1024; Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf); Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf); rawData = generate(BYTE_SIZE); try { for (int i = 0; i < tryNumber; i++) compressDecompressZlib(rawData, (ZlibCompressor) zlibCompressor, (ZlibDecompressor) zlibDecompressor); zlibCompressor.reinit(conf); } catch (Exception ex) { fail("testZlibCompressorDecompressorWithConfiguration ex error " + ex); } } else { assertTrue("ZlibFactory is using native libs against request", ZlibFactory.isNativeZlibLoaded(conf)); } }
@Test public void testZlibCompressorDecompressorSetDictionary() { Configuration conf = new Configuration(); if (ZlibFactory.isNativeZlibLoaded(conf)) { Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf); Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf); checkSetDictionaryNullPointerException(zlibCompressor); checkSetDictionaryNullPointerException(zlibDecompressor); checkSetDictionaryArrayIndexOutOfBoundsException(zlibDecompressor); checkSetDictionaryArrayIndexOutOfBoundsException(zlibCompressor); } else { assertTrue("ZlibFactory is using native libs against request", ZlibFactory.isNativeZlibLoaded(conf)); } }
@Test public void testZlibCompressorDecompressorWithConfiguration() { Configuration conf = new Configuration(); conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true); if (ZlibFactory.isNativeZlibLoaded(conf)) { byte[] rawData; int tryNumber = 5; int BYTE_SIZE = 10 * 1024; Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf); Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf); rawData = generate(BYTE_SIZE); try { for (int i = 0; i < tryNumber; i++) compressDecompressZlib(rawData, (ZlibCompressor) zlibCompressor, (ZlibDecompressor) zlibDecompressor); zlibCompressor.reinit(conf); } catch (Exception ex) { fail("testZlibCompressorDecompressorWithConfiguration ex error " + ex); } } else { assertTrue("ZlibFactory is using native libs against request", ZlibFactory.isNativeZlibLoaded(conf)); } }
@Test public void testZlibCompressorDecompressorSetDictionary() { Configuration conf = new Configuration(); conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true); if (ZlibFactory.isNativeZlibLoaded(conf)) { Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf); Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf); checkSetDictionaryNullPointerException(zlibCompressor); checkSetDictionaryNullPointerException(zlibDecompressor); checkSetDictionaryArrayIndexOutOfBoundsException(zlibDecompressor); checkSetDictionaryArrayIndexOutOfBoundsException(zlibCompressor); } else { assertTrue("ZlibFactory is using native libs against request", ZlibFactory.isNativeZlibLoaded(conf)); } }
public Decompressor getDecompressor() { CompressionCodec codec = getCodec(conf); if (codec != null) { Decompressor decompressor = CodecPool.getDecompressor(codec); if (LOG.isTraceEnabled()) LOG.trace("Retrieved decompressor " + decompressor + " from pool."); if (decompressor != null) { if (decompressor.finished()) { // Somebody returns the decompressor to CodecPool but is still using it. LOG.warn("Deompressor obtained from CodecPool is already finished()"); } decompressor.reset(); } return decompressor; } return null; }
/** * Decompresses data from the given stream using the configured compression algorithm. It will * throw an exception if the dest buffer does not have enough space to hold the decompressed data. * @param dest the output bytes buffer * @param destOffset start writing position of the output buffer * @param bufferedBoundedStream a stream to read compressed data from, bounded to the exact amount * of compressed data * @param compressedSize compressed data size, header not included * @param uncompressedSize uncompressed data size, header not included * @param compressAlgo compression algorithm used * @throws IOException */ public static void decompress(byte[] dest, int destOffset, InputStream bufferedBoundedStream, int compressedSize, int uncompressedSize, Compression.Algorithm compressAlgo) throws IOException { if (dest.length - destOffset < uncompressedSize) { throw new IllegalArgumentException("Output buffer does not have enough space to hold " + uncompressedSize + " decompressed bytes, available: " + (dest.length - destOffset)); } Decompressor decompressor = null; try { decompressor = compressAlgo.getDecompressor(); InputStream is = compressAlgo.createDecompressionStream(bufferedBoundedStream, decompressor, 0); IOUtils.readFully(is, dest, destOffset, uncompressedSize); is.close(); } finally { if (decompressor != null) { compressAlgo.returnDecompressor(decompressor); } } }
@Override public synchronized InputStream createDecompressionStream( InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { if (!isSupported()) { throw new IOException( "LZO codec class not specified. Did you forget to set property " + CONF_LZO_CLASS + "?"); } InputStream bis1 = null; if (downStreamBufferSize > 0) { bis1 = new BufferedInputStream(downStream, downStreamBufferSize); } else { bis1 = downStream; } conf.setInt("io.compression.codec.lzo.buffersize", 64 * 1024); CompressionInputStream cis = codec.createInputStream(bis1, decompressor); BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); return bis2; }
/** * * @param source * @param dest * @param codec * @param compressor * may be null * @param decomp * may be null * @param mark * @return * @throws IOException */ public static final CompressionOutputStream copy(File source, File dest, CompressionCodec codec, Compressor compressor, Decompressor decomp, long mark) throws IOException { FileInputStream fileInput = new FileInputStream(source); CompressionInputStream in = (decomp == null) ? codec .createInputStream(fileInput) : codec.createInputStream( fileInput, decomp); FileOutputStream fileOut = new FileOutputStream(dest); CompressionOutputStream out = (compressor == null) ? codec .createOutputStream(fileOut) : codec.createOutputStream( fileOut, compressor); try { copy(in, out, mark); return out; } finally { IOUtils.closeQuietly(in); IOUtils.closeQuietly(fileInput); } }
private InputStream openFile(Path path) throws IOException { CompressionCodec codec=new CompressionCodecFactory(miniCluster.getConfig()).getCodec(path); FSDataInputStream fileIn=dfsCluster.getFileSystem().open(path); // check if compressed if (codec==null) { // uncompressed return fileIn; } else { // compressed Decompressor decompressor = CodecPool.getDecompressor(codec); this.openDecompressors.add(decompressor); // to be returned later using close if (codec instanceof SplittableCompressionCodec) { long end = dfsCluster.getFileSystem().getFileStatus(path).getLen(); final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS); return cIn; } else { return codec.createInputStream(fileIn,decompressor); } } }
public InputStream openFile(Path path) throws IOException { CompressionCodec codec=compressionCodecs.getCodec(path); FSDataInputStream fileIn=fs.open(path); // check if compressed if (codec==null) { // uncompressed LOG.debug("Reading from an uncompressed file \""+path+"\""); return fileIn; } else { // compressed Decompressor decompressor = CodecPool.getDecompressor(codec); this.openDecompressors.add(decompressor); // to be returned later using close if (codec instanceof SplittableCompressionCodec) { LOG.debug("Reading from a compressed file \""+path+"\" with splittable compression codec"); long end = fs.getFileStatus(path).getLen(); return ((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS); } else { LOG.debug("Reading from a compressed file \""+path+"\" with non-splittable compression codec"); return codec.createInputStream(fileIn,decompressor); } } }
private InputStream openFile(Path path) throws IOException { CompressionCodec codec=new CompressionCodecFactory(conf).getCodec(path); FSDataInputStream fileIn=dfsCluster.getFileSystem().open(path); // check if compressed if (codec==null) { // uncompressed return fileIn; } else { // compressed Decompressor decompressor = CodecPool.getDecompressor(codec); this.openDecompressors.add(decompressor); // to be returned later using close if (codec instanceof SplittableCompressionCodec) { long end = dfsCluster.getFileSystem().getFileStatus(path).getLen(); final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS); return cIn; } else { return codec.createInputStream(fileIn,decompressor); } } }
public static BufferedReader getBufferedReader(File file, MapredContext context) throws IOException { URI fileuri = file.toURI(); Path path = new Path(fileuri); Configuration conf = context.getJobConf(); CompressionCodecFactory ccf = new CompressionCodecFactory(conf); CompressionCodec codec = ccf.getCodec(path); if (codec == null) { return new BufferedReader(new FileReader(file)); } else { Decompressor decompressor = CodecPool.getDecompressor(codec); FileInputStream fis = new FileInputStream(file); CompressionInputStream cis = codec.createInputStream(fis, decompressor); BufferedReader br = new BufferedReaderExt(new InputStreamReader(cis), decompressor); return br; } }
public Decompressor getDecompressor() { CompressionCodec codec = getCodec(conf); if (codec != null) { Decompressor decompressor = CodecPool.getDecompressor(codec); if (decompressor != null) { if (decompressor.finished()) { // Somebody returns the decompressor to CodecPool but is still using // it. LOG .warn("Deompressor obtained from CodecPool is already finished()"); // throw new AssertionError( // "Decompressor obtained from CodecPool is already finished()"); } decompressor.reset(); } return decompressor; } return null; }
/** * Decompresses data from the given stream using the configured compression algorithm. * @param dest * @param destOffset * @param bufferedBoundedStream a stream to read compressed data from, bounded to the exact * amount of compressed data * @param uncompressedSize uncompressed data size, header not included * @throws IOException */ protected void decompress(byte[] dest, int destOffset, InputStream bufferedBoundedStream, int uncompressedSize) throws IOException { Decompressor decompressor = null; try { decompressor = compressAlgo.getDecompressor(); InputStream is = compressAlgo.createDecompressionStream(bufferedBoundedStream, decompressor, 0); IOUtils.readFully(is, dest, destOffset, uncompressedSize); is.close(); } finally { if (decompressor != null) { compressAlgo.returnDecompressor(decompressor); } } }
@Override public synchronized InputStream createDecompressionStream( InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { if (!isSupported()) { throw new IOException( "LZMA codec cannot be loaded. " + "You may want to check LD_LIBRARY_PATH."); } InputStream bis1 = null; if (downStreamBufferSize > 0) { bis1 = new BufferedInputStream(downStream, downStreamBufferSize); } else { bis1 = downStream; } conf.setInt("io.compression.codec.lzma.buffersize", 64 * 1024); CompressionInputStream cis = codec.createInputStream(bis1, decompressor); BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); return bis2; }
public RBlockState(Algorithm compressionAlgo, FSDataInputStream fsin, BlockRegion region, Configuration conf, Reader r) throws IOException { this.compressAlgo = compressionAlgo; Decompressor decompressor = compressionAlgo.getDecompressor(); this.region = region; try { InputStream in = compressAlgo.createDecompressionStream(new BoundedRangeFileInputStream(fsin, region.getOffset(), region.getCompressedSize()), decompressor, DTFile.getFSInputBufferSize(conf)); int l = 1; r.baos.reset(); byte[] buf = new byte[DTFile.getFSInputBufferSize(conf)]; while (l >= 0) { l = in.read(buf); if (l > 0) { r.baos.write(buf, 0, l); } } // keep decompressed data into cache byte[] blockData = r.baos.toByteArray(); rbain = new ReusableByteArrayInputStream(blockData); } catch (IOException e) { compressAlgo.returnDecompressor(decompressor); throw e; } }
/** * This function opens a stream to read a compressed file. Stream is not * closed, the user has to close it when read is finished. * * @param filePath * @return */ public static InputStream openCompressedFile(Path filePath, Configuration conf) { CompressionCodecFactory factory = new CompressionCodecFactory(conf); CompressionCodec codec = factory.getCodec(filePath); if (codec == null) { log.error("No codec found for file " + filePath.toString()); return null; } try { FileSystem fs = filePath.getFileSystem(conf); Decompressor decompressor = codec.createDecompressor(); return codec.createInputStream(fs.open(filePath), decompressor); } catch (Exception e) { log.error("Error opening compressed file: " + e.getMessage()); e.printStackTrace(); } return null; }
@Override public RecordReader<LongWritable, ListWritable<Text>> getRecordReader(InputSplit inputSplit, JobConf conf, Reporter reporter) throws IOException { String charsetName = conf.get(CHARSET); Charset charset = charsetName != null ? Charset.forName(charsetName) : StandardCharsets.UTF_8; FileSplit split = (FileSplit) inputSplit; Path path = split.getPath(); FileSystem fs = path.getFileSystem(conf); InputStream is = fs.open(path); // If the input is compressed, load the compression codec. CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); CompressionCodec codec = codecFactory.getCodec(path); if (codec != null) { Decompressor decompressor = CodecPool.getDecompressor(codec); is = codec.createInputStream(is, decompressor); } return new CsvRecordReader(new InputStreamReader(is, charset), createFormat(conf), split.getLength(), conf.getBoolean(STRICT_MODE, true)); }