public Compressor getCompressor() throws IOException { CompressionCodec codec = getCodec(); if (codec != null) { Compressor compressor = CodecPool.getCompressor(codec); if (compressor != null) { if (compressor.finished()) { // Somebody returns the compressor to CodecPool but is still using // it. LOG.warn("Compressor obtained from CodecPool already finished()"); } else { if(LOG.isDebugEnabled()) { LOG.debug("Got a compressor: " + compressor.hashCode()); } } /** * Following statement is necessary to get around bugs in 0.18 where a * compressor is referenced after returned back to the codec pool. */ compressor.reset(); } return compressor; } return null; }
@Test public void testZlibCompressorDecompressorWithConfiguration() { Configuration conf = new Configuration(); if (ZlibFactory.isNativeZlibLoaded(conf)) { byte[] rawData; int tryNumber = 5; int BYTE_SIZE = 10 * 1024; Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf); Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf); rawData = generate(BYTE_SIZE); try { for (int i = 0; i < tryNumber; i++) compressDecompressZlib(rawData, (ZlibCompressor) zlibCompressor, (ZlibDecompressor) zlibDecompressor); zlibCompressor.reinit(conf); } catch (Exception ex) { fail("testZlibCompressorDecompressorWithConfiguration ex error " + ex); } } else { assertTrue("ZlibFactory is using native libs against request", ZlibFactory.isNativeZlibLoaded(conf)); } }
@Test public void testZlibCompressorDecompressorWithCompressionLevels() { Configuration conf = new Configuration(); conf.set("zlib.compress.level","FOUR"); if (ZlibFactory.isNativeZlibLoaded(conf)) { byte[] rawData; int tryNumber = 5; int BYTE_SIZE = 10 * 1024; Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf); Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf); rawData = generate(BYTE_SIZE); try { for (int i = 0; i < tryNumber; i++) compressDecompressZlib(rawData, (ZlibCompressor) zlibCompressor, (ZlibDecompressor) zlibDecompressor); zlibCompressor.reinit(conf); } catch (Exception ex) { fail("testZlibCompressorDecompressorWithConfiguration ex error " + ex); } } else { assertTrue("ZlibFactory is using native libs against request", ZlibFactory.isNativeZlibLoaded(conf)); } }
@Test public void testZlibCompressorDecompressorSetDictionary() { Configuration conf = new Configuration(); if (ZlibFactory.isNativeZlibLoaded(conf)) { Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf); Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf); checkSetDictionaryNullPointerException(zlibCompressor); checkSetDictionaryNullPointerException(zlibDecompressor); checkSetDictionaryArrayIndexOutOfBoundsException(zlibDecompressor); checkSetDictionaryArrayIndexOutOfBoundsException(zlibCompressor); } else { assertTrue("ZlibFactory is using native libs against request", ZlibFactory.isNativeZlibLoaded(conf)); } }
private JsonGenerator createJsonGenerator(Configuration conf, Path path) throws IOException { FileSystem outFS = path.getFileSystem(conf); CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(path); OutputStream output; Compressor compressor = null; if (codec != null) { compressor = CodecPool.getCompressor(codec); output = codec.createOutputStream(outFS.create(path), compressor); } else { output = outFS.create(path); } JsonGenerator outGen = outFactory.createJsonGenerator(output, JsonEncoding.UTF8); outGen.useDefaultPrettyPrinter(); return outGen; }
@Test public void testZlibCompressorDecompressorWithConfiguration() { Configuration conf = new Configuration(); conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true); if (ZlibFactory.isNativeZlibLoaded(conf)) { byte[] rawData; int tryNumber = 5; int BYTE_SIZE = 10 * 1024; Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf); Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf); rawData = generate(BYTE_SIZE); try { for (int i = 0; i < tryNumber; i++) compressDecompressZlib(rawData, (ZlibCompressor) zlibCompressor, (ZlibDecompressor) zlibDecompressor); zlibCompressor.reinit(conf); } catch (Exception ex) { fail("testZlibCompressorDecompressorWithConfiguration ex error " + ex); } } else { assertTrue("ZlibFactory is using native libs against request", ZlibFactory.isNativeZlibLoaded(conf)); } }
@Test public void testZlibCompressorDecompressorSetDictionary() { Configuration conf = new Configuration(); conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, true); if (ZlibFactory.isNativeZlibLoaded(conf)) { Compressor zlibCompressor = ZlibFactory.getZlibCompressor(conf); Decompressor zlibDecompressor = ZlibFactory.getZlibDecompressor(conf); checkSetDictionaryNullPointerException(zlibCompressor); checkSetDictionaryNullPointerException(zlibDecompressor); checkSetDictionaryArrayIndexOutOfBoundsException(zlibDecompressor); checkSetDictionaryArrayIndexOutOfBoundsException(zlibCompressor); } else { assertTrue("ZlibFactory is using native libs against request", ZlibFactory.isNativeZlibLoaded(conf)); } }
public static void testCompression(Compression.Algorithm algo) throws IOException { if (compressionTestResults[algo.ordinal()] != null) { if (compressionTestResults[algo.ordinal()]) { return ; // already passed test, dont do it again. } else { // failed. throw new DoNotRetryIOException("Compression algorithm '" + algo.getName() + "'" + " previously failed test."); } } try { Compressor c = algo.getCompressor(); algo.returnCompressor(c); compressionTestResults[algo.ordinal()] = true; // passes } catch (Throwable t) { compressionTestResults[algo.ordinal()] = false; // failure throw new DoNotRetryIOException(t); } }
public Compressor getCompressor() { CompressionCodec codec = getCodec(conf); if (codec != null) { Compressor compressor = CodecPool.getCompressor(codec); if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool."); if (compressor != null) { if (compressor.finished()) { // Somebody returns the compressor to CodecPool but is still using it. LOG.warn("Compressor obtained from CodecPool is already finished()"); } compressor.reset(); } return compressor; } return null; }
/** * Find the size of compressed data assuming that buffer will be compressed * using given algorithm. * @param algo compression algorithm * @param compressor compressor already requested from codec * @param inputBuffer Array to be compressed. * @param offset Offset to beginning of the data. * @param length Length to be compressed. * @return Size of compressed data in bytes. * @throws IOException */ public static int getCompressedSize(Algorithm algo, Compressor compressor, byte[] inputBuffer, int offset, int length) throws IOException { DataOutputStream compressedStream = new DataOutputStream( new IOUtils.NullOutputStream()); if (compressor != null) { compressor.reset(); } OutputStream compressingStream = null; try { compressingStream = algo.createCompressionStream( compressedStream, compressor, 0); compressingStream.write(inputBuffer, offset, length); compressingStream.flush(); return compressedStream.size(); } finally { if (compressingStream != null) compressingStream.close(); } }
@Override public synchronized OutputStream createCompressionStream( OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { if (!isSupported()) { throw new IOException( "LZO codec class not specified. Did you forget to set property " + CONF_LZO_CLASS + "?"); } OutputStream bos1 = null; if (downStreamBufferSize > 0) { bos1 = new BufferedOutputStream(downStream, downStreamBufferSize); } else { bos1 = downStream; } conf.setInt("io.compression.codec.lzo.buffersize", 64 * 1024); CompressionOutputStream cos = codec.createOutputStream(bos1, compressor); BufferedOutputStream bos2 = new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE); return bos2; }
@Override public synchronized OutputStream createCompressionStream( OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { OutputStream bos1 = null; if (downStreamBufferSize > 0) { bos1 = new BufferedOutputStream(downStream, downStreamBufferSize); } else { bos1 = downStream; } codec.getConf().setInt("io.file.buffer.size", 32 * 1024); CompressionOutputStream cos = codec.createOutputStream(bos1, compressor); BufferedOutputStream bos2 = new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE); return bos2; }
@Override public CompressionOutputStream create(OutputStream output, long timeout, TimeUnit unit) throws IOException, InterruptedException { if (hasCompressors) { Compressor compressor = compressorQueue.poll(timeout, unit); if (compressor == null) { if (adaptiveIncrement) { LOG.info("Adaptive increment, creating new compressor"); compressor = codec.createCompressor(); } else { return null; } } CompressionOutputStream cout = codec.createOutputStream(output, compressor); usedCompressors.put(cout, compressor); status.setCounter(COMPRESSOR_STR, compressorsUsedCount.getAndIncrement()); return cout; } else { return codec.createOutputStream(output); } }
@Override public void closeAndRelease(CompressionOutputStream cout) { try { // finish quietly cout.finish(); } catch (IOException ioexp) { LOG.error(ioexp.toString(), ioexp); } IOUtils.closeQuietly(cout); if (hasCompressors) { Compressor comp = usedCompressors.remove(cout); comp.reset(); compressorQueue.offer(comp); status.setCounter(COMPRESSOR_STR, compressorsUsedCount.decrementAndGet()); } }
/** * * @param source * @param dest * @param codec * @param compressor * may be null * @param decomp * may be null * @param mark * @return * @throws IOException */ public static final CompressionOutputStream copy(File source, File dest, CompressionCodec codec, Compressor compressor, Decompressor decomp, long mark) throws IOException { FileInputStream fileInput = new FileInputStream(source); CompressionInputStream in = (decomp == null) ? codec .createInputStream(fileInput) : codec.createInputStream( fileInput, decomp); FileOutputStream fileOut = new FileOutputStream(dest); CompressionOutputStream out = (compressor == null) ? codec .createOutputStream(fileOut) : codec.createOutputStream( fileOut, compressor); try { copy(in, out, mark); return out; } finally { IOUtils.closeQuietly(in); IOUtils.closeQuietly(fileInput); } }
public static void testCompression(Compression.Algorithm algo) throws IOException { if (compressionTestResults[algo.ordinal()] != null) { if (compressionTestResults[algo.ordinal()]) { return ; // already passed test, dont do it again. } else { // failed. throw new IOException("Compression algorithm '" + algo.getName() + "'" + " previously failed test."); } } Configuration conf = HBaseConfiguration.create(); try { Compressor c = algo.getCompressor(); algo.returnCompressor(c); compressionTestResults[algo.ordinal()] = true; // passes } catch (Throwable t) { compressionTestResults[algo.ordinal()] = false; // failure throw new IOException(t); } }
public OutputStream createCompressionStream( OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { OutputStream bos1 = null; if (downStreamBufferSize > 0) { bos1 = new BufferedOutputStream(downStream, downStreamBufferSize); } else { bos1 = downStream; } CompressionOutputStream cos = createPlainCompressionStream(bos1, compressor); BufferedOutputStream bos2 = new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE); return bos2; }
public Compressor getCompressor() { CompressionCodec codec = getCodec(conf); if (codec != null) { Compressor compressor = CodecPool.getCompressor(codec); if (compressor != null) { if (compressor.finished()) { // Somebody returns the compressor to CodecPool but is still using // it. LOG .warn("Compressor obtained from CodecPool is already finished()"); // throw new AssertionError( // "Compressor obtained from CodecPool is already finished()"); } compressor.reset(); } return compressor; } return null; }
/** * Find the size of compressed data assuming that buffer will be compressed * using given algorithm. * @param compressor Algorithm used for compression. * @param buffer Array to be compressed. * @param offset Offset to beginning of the data. * @param length Length to be compressed. * @return Size of compressed data in bytes. */ public static int checkCompressedSize(Compressor compressor, byte[] buffer, int offset, int length) { byte[] compressedBuffer = new byte[buffer.length]; // in fact the buffer could be of any positive size compressor.setInput(buffer, offset, length); compressor.finish(); int currentPos = 0; while (!compressor.finished()) { try { // we don't care about compressed data, // we just want to callculate number of bytes currentPos += compressor.compress(compressedBuffer, 0, compressedBuffer.length); } catch (IOException e) { throw new RuntimeException( "For some reason compressor couldn't read data. " + "It is likely a problem with " + compressor.getClass().getName(), e); } } return currentPos; }
@Override public synchronized OutputStream createCompressionStream( OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { if (!isSupported()) { throw new IOException( "LZO codec class not specified. Did you forget to set property " + CONF_LZO_CLASS + "?"); } OutputStream bos1 = null; if (downStreamBufferSize > 0) { bos1 = new BufferedOutputStream(downStream, downStreamBufferSize); } else { bos1 = downStream; } conf.setInt(IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); CompressionOutputStream cos = codec.createOutputStream(bos1, compressor); BufferedOutputStream bos2 = new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE); return bos2; }
@Override public synchronized OutputStream createCompressionStream( OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException { if (!isSupported()) { throw new IOException( "LZMA codec cannot be loaded. " + "You may want to check LD_LIBRARY_PATH."); } OutputStream bos1 = null; if (downStreamBufferSize > 0) { bos1 = new BufferedOutputStream(downStream, downStreamBufferSize); } else { bos1 = downStream; } conf.setInt("io.compression.codec.lzma.buffersize", 64 * 1024); CompressionOutputStream cos = codec.createOutputStream(bos1, compressor); BufferedOutputStream bos2 = new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), DATA_OBUF_SIZE); return bos2; }