/** * Returns a {@link OutputStream} for a file that might need * compression. */ static OutputStream getPossiblyCompressedOutputStream(Path file, Configuration conf) throws IOException { FileSystem fs = file.getFileSystem(conf); JobConf jConf = new JobConf(conf); if (org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(jConf)) { // get the codec class Class<? extends CompressionCodec> codecClass = org.apache.hadoop.mapred.FileOutputFormat .getOutputCompressorClass(jConf, GzipCodec.class); // get the codec implementation CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); // add the appropriate extension file = file.suffix(codec.getDefaultExtension()); if (isCompressionEmulationEnabled(conf)) { FSDataOutputStream fileOut = fs.create(file, false); return new DataOutputStream(codec.createOutputStream(fileOut)); } } return fs.create(file, false); }
public Compressor getCompressor() { CompressionCodec codec = getCodec(conf); if (codec != null) { Compressor compressor = CodecPool.getCompressor(codec); if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool."); if (compressor != null) { if (compressor.finished()) { // Somebody returns the compressor to CodecPool but is still using it. LOG.warn("Compressor obtained from CodecPool is already finished()"); } compressor.reset(); } return compressor; } return null; }
/** * Clones the attributes (like compression of the input file and creates a * corresponding Writer * @param inputFile the path of the input file whose attributes should be * cloned * @param outputFile the path of the output file * @param prog the Progressable to report status during the file write * @return Writer * @throws IOException */ public Writer cloneFileAttributes(Path inputFile, Path outputFile, Progressable prog) throws IOException { Reader reader = new Reader(conf, Reader.file(inputFile), new Reader.OnlyHeaderOption()); CompressionType compress = reader.getCompressionType(); CompressionCodec codec = reader.getCompressionCodec(); reader.close(); Writer writer = createWriter(conf, Writer.file(outputFile), Writer.keyClass(keyClass), Writer.valueClass(valClass), Writer.compression(compress, codec), Writer.progressable(prog)); return writer; }
public Compressor getCompressor() throws IOException { CompressionCodec codec = getCodec(); if (codec != null) { Compressor compressor = CodecPool.getCompressor(codec); if (compressor != null) { if (compressor.finished()) { // Somebody returns the compressor to CodecPool but is still using // it. LOG.warn("Compressor obtained from CodecPool already finished()"); } else { if(LOG.isDebugEnabled()) { LOG.debug("Got a compressor: " + compressor.hashCode()); } } /** * Following statement is necessary to get around bugs in 0.18 where a * compressor is referenced after returned back to the codec pool. */ compressor.reset(); } return compressor; } return null; }
/** * Get the {@link CompressionCodec} for compressing the job outputs. * @param conf the {@link JobConf} to look in * @param defaultValue the {@link CompressionCodec} to return if not set * @return the {@link CompressionCodec} to be used to compress the * job outputs * @throws IllegalArgumentException if the class was specified, but not found */ public static Class<? extends CompressionCodec> getOutputCompressorClass(JobConf conf, Class<? extends CompressionCodec> defaultValue) { Class<? extends CompressionCodec> codecClass = defaultValue; String name = conf.get(org.apache.hadoop.mapreduce.lib.output. FileOutputFormat.COMPRESS_CODEC); if (name != null) { try { codecClass = conf.getClassByName(name).asSubclass(CompressionCodec.class); } catch (ClassNotFoundException e) { throw new IllegalArgumentException("Compression codec " + name + " was not found.", e); } } return codecClass; }
public Decompressor getDecompressor() throws IOException { CompressionCodec codec = getCodec(); if (codec != null) { Decompressor decompressor = CodecPool.getDecompressor(codec); if (decompressor != null) { if (decompressor.finished()) { // Somebody returns the decompressor to CodecPool but is still using // it. LOG.warn("Deompressor obtained from CodecPool already finished()"); } else { if(LOG.isDebugEnabled()) { LOG.debug("Got a decompressor: " + decompressor.hashCode()); } } /** * Following statement is necessary to get around bugs in 0.18 where a * decompressor is referenced after returned back to the codec pool. */ decompressor.reset(); } return decompressor; } return null; }
private JsonGenerator createJsonGenerator(Configuration conf, Path path) throws IOException { FileSystem outFS = path.getFileSystem(conf); CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(path); OutputStream output; Compressor compressor = null; if (codec != null) { compressor = CodecPool.getCompressor(codec); output = codec.createOutputStream(outFS.create(path), compressor); } else { output = outFS.create(path); } JsonGenerator outGen = outFactory.createJsonGenerator(output, JsonEncoding.UTF8); outGen.useDefaultPrettyPrinter(); return outGen; }
@SuppressWarnings("deprecation") private void writeMetadataTest(FileSystem fs, int count, int seed, Path file, CompressionType compressionType, CompressionCodec codec, SequenceFile.Metadata metadata) throws IOException { fs.delete(file, true); LOG.info("creating " + count + " records with metadata and with " + compressionType + " compression"); SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, file, RandomDatum.class, RandomDatum.class, compressionType, codec, null, metadata); RandomDatum.Generator generator = new RandomDatum.Generator(seed); for (int i = 0; i < count; i++) { generator.next(); RandomDatum key = generator.getKey(); RandomDatum value = generator.getValue(); writer.append(key, value); } writer.close(); }
@Override public void open(String filePath, CompressionCodec codeC, CompressionType compType) throws IOException { Configuration conf = new Configuration(); Path dstPath = new Path(filePath); FileSystem hdfs = dstPath.getFileSystem(conf); open(dstPath, codeC, compType, conf, hdfs); }
protected void open(Path dstPath, CompressionCodec codeC, CompressionType compType, Configuration conf, FileSystem hdfs) throws IOException { if (useRawLocalFileSystem) { if (hdfs instanceof LocalFileSystem) { hdfs = ((LocalFileSystem)hdfs).getRaw(); } else { logger.warn("useRawLocalFileSystem is set to true but file system " + "is not of type LocalFileSystem: " + hdfs.getClass().getName()); } } if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile(dstPath)) { outStream = hdfs.append(dstPath); } else { outStream = hdfs.create(dstPath); } writer = SequenceFile.createWriter(conf, outStream, serializer.getKeyClass(), serializer.getValueClass(), compType, codeC); registerCurrentStream(outStream, hdfs, dstPath); }
/** * Construct an IFile Reader. * * @param conf Configuration File * @param in The input stream * @param length Length of the data in the stream, including the checksum * bytes. * @param codec codec * @param readsCounter Counter for records read from disk * @throws IOException */ public Reader(Configuration conf, FSDataInputStream in, long length, CompressionCodec codec, Counters.Counter readsCounter) throws IOException { readRecordsCounter = readsCounter; checksumIn = new IFileInputStream(in,length, conf); if (codec != null) { decompressor = CodecPool.getDecompressor(codec); if (decompressor != null) { this.in = codec.createInputStream(checksumIn, decompressor); } else { LOG.warn("Could not obtain decompressor from CodecPool"); this.in = checksumIn; } } else { this.in = checksumIn; } this.dataIn = new DataInputStream(this.in); this.fileLength = length; if (conf != null) { bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE); } }
@Override public void open(String filePath, CompressionCodec codeC, CompressionType compType) throws IOException { super.open(filePath, codeC, compType); if (closed) { opened = true; } }
protected void open(Path dstPath, CompressionCodec codeC, CompressionType compType, Configuration conf, FileSystem hdfs) throws IOException { if(useRawLocalFileSystem) { if(hdfs instanceof LocalFileSystem) { hdfs = ((LocalFileSystem)hdfs).getRaw(); } else { logger.warn("useRawLocalFileSystem is set to true but file system " + "is not of type LocalFileSystem: " + hdfs.getClass().getName()); } } if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile (dstPath)) { outStream = hdfs.append(dstPath); } else { outStream = hdfs.create(dstPath); } writer = SequenceFile.createWriter(conf, outStream, serializer.getKeyClass(), serializer.getValueClass(), compType, codeC); registerCurrentStream(outStream, hdfs, dstPath); }
/** * Get the {@link CompressionCodec} for compressing the job outputs. * @param job the {@link Job} to look in * @param defaultValue the {@link CompressionCodec} to return if not set * @return the {@link CompressionCodec} to be used to compress the * job outputs * @throws IllegalArgumentException if the class was specified, but not found */ public static Class<? extends CompressionCodec> getOutputCompressorClass(JobContext job, Class<? extends CompressionCodec> defaultValue) { Class<? extends CompressionCodec> codecClass = defaultValue; Configuration conf = job.getConfiguration(); String name = conf.get(FileOutputFormat.COMPRESS_CODEC); if (name != null) { try { codecClass = conf.getClassByName(name).asSubclass(CompressionCodec.class); } catch (ClassNotFoundException e) { throw new IllegalArgumentException("Compression codec " + name + " was not found.", e); } } return codecClass; }
public PossiblyDecompressedInputStream(Path inputPath, Configuration conf) throws IOException { CompressionCodecFactory codecs = new CompressionCodecFactory(conf); CompressionCodec inputCodec = codecs.getCodec(inputPath); FileSystem ifs = inputPath.getFileSystem(conf); FSDataInputStream fileIn = ifs.open(inputPath); if (inputCodec == null) { decompressor = null; coreInputStream = fileIn; } else { decompressor = CodecPool.getDecompressor(inputCodec); coreInputStream = inputCodec.createInputStream(fileIn, decompressor); } }
/** * Given a codec name, instantiate the concrete implementation * class that implements it. * @throws com.cloudera.sqoop.io.UnsupportedCodecException if a codec cannot * be found with the supplied name. */ public static CompressionCodec getCodec(String codecName, Configuration conf) throws com.cloudera.sqoop.io.UnsupportedCodecException { // Try standard Hadoop mechanism first CompressionCodec codec = getCodecByName(codecName, conf); if (codec != null) { return codec; } // Fall back to Sqoop mechanism String codecClassName = null; try { codecClassName = getCodecClassName(codecName); if (null == codecClassName) { return null; } Class<? extends CompressionCodec> codecClass = (Class<? extends CompressionCodec>) conf.getClassByName(codecClassName); return (CompressionCodec) ReflectionUtils.newInstance( codecClass, conf); } catch (ClassNotFoundException cnfe) { throw new com.cloudera.sqoop.io.UnsupportedCodecException( "Cannot find codec class " + codecClassName + " for codec " + codecName); } }
/** * Gets the short name for a specified codec. See {@link * #getCodecByName(String, Configuration)} for details. The name returned * here is the shortest possible one that means a {@code Codec} part is * removed as well. * * @param codecName name of the codec to return the short name for * @param conf job configuration object used to get the registered * compression codecs * * @return the short name of the codec * * @throws com.cloudera.sqoop.io.UnsupportedCodecException * if no short name could be found */ public static String getCodecShortNameByName(String codecName, Configuration conf) throws com.cloudera.sqoop.io.UnsupportedCodecException { if (codecNames.containsKey(codecName)) { return codecName; } CompressionCodec codec = getCodecByName(codecName, conf); Class<? extends CompressionCodec> codecClass = null; if (codec != null) { codecClass = codec.getClass(); } if (codecClass != null) { String simpleName = codecClass.getSimpleName(); if (simpleName.endsWith("Codec")) { simpleName = simpleName.substring(0, simpleName.length() - "Codec".length()); } return simpleName.toLowerCase(); } throw new com.cloudera.sqoop.io.UnsupportedCodecException( "Cannot find codec class " + codecName + " for codec " + codecName); }
public Decompressor getDecompressor() { CompressionCodec codec = getCodec(conf); if (codec != null) { Decompressor decompressor = CodecPool.getDecompressor(codec); if (LOG.isTraceEnabled()) LOG.trace("Retrieved decompressor " + decompressor + " from pool."); if (decompressor != null) { if (decompressor.finished()) { // Somebody returns the decompressor to CodecPool but is still using it. LOG.warn("Deompressor obtained from CodecPool is already finished()"); } decompressor.reset(); } return decompressor; } return null; }
@Override // Mapper public void configure(JobConf conf) { super.configure(conf); // grab compression String compression = getConf().get("test.io.compression.class", null); Class<? extends CompressionCodec> codec; // try to initialize codec try { codec = (compression == null) ? null : Class.forName(compression).asSubclass(CompressionCodec.class); } catch(Exception e) { throw new RuntimeException("Compression codec not found: ", e); } if(codec != null) { compressionCodec = (CompressionCodec) ReflectionUtils.newInstance(codec, getConf()); } }
public static <K extends Object, V extends Object> RawKeyValueIterator merge(Configuration conf, FileSystem fs, Class<K> keyClass, Class<V> valueClass, CompressionCodec codec, List<Segment<K, V>> segments, int mergeFactor, int inMemSegments, Path tmpDir, RawComparator<K> comparator, Progressable reporter, boolean sortSegments, Counters.Counter readsCounter, Counters.Counter writesCounter, Progress mergePhase) throws IOException { return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter, sortSegments, codec, TaskType.REDUCE).merge(keyClass, valueClass, mergeFactor, inMemSegments, tmpDir, readsCounter, writesCounter, mergePhase); }
/** * Construct the preferred type of SequenceFile Writer. * @param fs The configured filesystem. * @param conf The configuration. * @param name The name of the file. * @param keyClass The 'key' type. * @param valClass The 'value' type. * @param bufferSize buffer size for the underlaying outputstream. * @param replication replication factor for the file. * @param blockSize block size for the file. * @param createParent create parent directory if non-existent * @param compressionType The compression type. * @param codec The compression codec. * @param metadata The metadata of the file. * @return Returns the handle to the constructed SequenceFile Writer. * @throws IOException */ @Deprecated public static Writer createWriter(FileSystem fs, Configuration conf, Path name, Class keyClass, Class valClass, int bufferSize, short replication, long blockSize, boolean createParent, CompressionType compressionType, CompressionCodec codec, Metadata metadata) throws IOException { return createWriter(FileContext.getFileContext(fs.getUri(), conf), conf, name, keyClass, valClass, compressionType, codec, metadata, EnumSet.of(CreateFlag.CREATE,CreateFlag.OVERWRITE), CreateOpts.bufferSize(bufferSize), createParent ? CreateOpts.createParent() : CreateOpts.donotCreateParent(), CreateOpts.repFac(replication), CreateOpts.blockSize(blockSize) ); }
protected boolean isSplitable(Configuration conf, Path file) { final CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(file); if (null == codec) { return true; } return codec instanceof SplittableCompressionCodec; }
public InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException { CompressionCodec codec = getCodec(conf); // Set the internal buffer size to read from down stream. if (downStreamBufferSize > 0) { ((Configurable) codec).getConf().setInt("io.file.buffer.size", downStreamBufferSize); } CompressionInputStream cis = codec.createInputStream(downStream, decompressor); BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); return bis2; }
@Override public void init(Path path, Configuration conf) throws IOException { FileSystem fs = path.getFileSystem(conf); CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(path); OutputStream output; if (codec != null) { compressor = CodecPool.getCompressor(codec); output = codec.createOutputStream(fs.create(path), compressor); } else { output = fs.create(path); } writer = new JsonObjectMapperWriter<T>(output, conf.getBoolean("rumen.output.pretty.print", true)); }
/** Create the named map for keys of the named class. * @deprecated Use Writer(Configuration, Path, Option...) instead. */ @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, Class<? extends WritableComparable> keyClass, Class valClass, CompressionType compress, CompressionCodec codec, Progressable progress) throws IOException { this(conf, new Path(dirName), keyClass(keyClass), valueClass(valClass), compression(compress, codec), progressable(progress)); }
/** Create the named map using the named key comparator. * @deprecated Use Writer(Configuration, Path, Option...) instead. */ @Deprecated public Writer(Configuration conf, FileSystem fs, String dirName, WritableComparator comparator, Class valClass, SequenceFile.CompressionType compress, CompressionCodec codec, Progressable progress) throws IOException { this(conf, new Path(dirName), comparator(comparator), valueClass(valClass), compression(compress, codec), progressable(progress)); }
public SeqFileAppendable(FileSystem fs, Path path, int osBufferSize, String compress, int minBlkSize) throws IOException { Configuration conf = new Configuration(); conf.setBoolean("hadoop.native.lib", true); CompressionCodec codec = null; if ("lzo".equals(compress)) { codec = Compression.Algorithm.LZO.getCodec(); } else if ("gz".equals(compress)) { codec = Compression.Algorithm.GZ.getCodec(); } else if (!"none".equals(compress)) throw new IOException("Codec not supported."); this.fsdos = fs.create(path, true, osBufferSize); if (!"none".equals(compress)) { writer = SequenceFile.createWriter(conf, fsdos, BytesWritable.class, BytesWritable.class, SequenceFile.CompressionType.BLOCK, codec); } else { writer = SequenceFile.createWriter(conf, fsdos, BytesWritable.class, BytesWritable.class, SequenceFile.CompressionType.NONE, null); } }
public void initialize(InputSplit genericSplit, TaskAttemptContext context) { try { FileSplit split = (FileSplit)genericSplit; Configuration job = context.getConfiguration(); this.maxLineLength = job.getInt("mapreduce.input.linerecordreader.line.maxlength", 2147483647); this.start = split.getStart(); this.end = this.start + split.getLength(); Path file = split.getPath(); FileSystem fs = file.getFileSystem(job); this.fileIn = fs.open(file); CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file); if(null != codec) { this.isCompressedInput = true; this.decompressor = CodecPool.getDecompressor(codec); if(codec instanceof SplittableCompressionCodec) { SplitCompressionInputStream cIn = ((SplittableCompressionCodec)codec).createInputStream(this.fileIn, this.decompressor, this.start, this.end, SplittableCompressionCodec.READ_MODE.BYBLOCK); this.in = new CompressedSplitLineReader(cIn, job, this.recordDelimiterBytes); this.start = cIn.getAdjustedStart(); this.end = cIn.getAdjustedEnd(); this.filePosition = cIn; } else { this.in = new SplitLineReader(codec.createInputStream(this.fileIn, this.decompressor), job, this.recordDelimiterBytes); this.filePosition = this.fileIn; } } else { this.fileIn.seek(this.start); this.in = new SplitLineReader(this.fileIn, job, this.recordDelimiterBytes); this.filePosition = this.fileIn; } if(this.start != 0L) { this.start += (long)this.in.readLine(new Text(), 0, this.maxBytesToConsume(this.start)); } this.pos = this.start; }catch(Exception ex){ LOG.warn("Exception occurred during initialization {}", ex, ex); } }
/** * Set the {@link CompressionCodec} to be used to compress job outputs. * @param job the job to modify * @param codecClass the {@link CompressionCodec} to be used to * compress the job outputs */ public static void setOutputCompressorClass(Job job, Class<? extends CompressionCodec> codecClass) { setCompressOutput(job, true); job.getConfiguration().setClass(FileOutputFormat.COMPRESS_CODEC, codecClass, CompressionCodec.class); }
private void flush(int count, int bytesProcessed, CompressionType compressionType, CompressionCodec codec, boolean done) throws IOException { if (out == null) { outName = done ? outFile : outFile.suffix(".0"); out = fs.create(outName); if (!done) { indexOut = fs.create(outName.suffix(".index")); } } long segmentStart = out.getPos(); Writer writer = createWriter(conf, Writer.stream(out), Writer.keyClass(keyClass), Writer.valueClass(valClass), Writer.compression(compressionType, codec), Writer.metadata(done ? metadata : new Metadata())); if (!done) { writer.sync = null; // disable sync on temp files } for (int i = 0; i < count; i++) { // write in sorted order int p = pointers[i]; writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]); } writer.close(); if (!done) { // Save the segment length WritableUtils.writeVLong(indexOut, segmentStart); WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart)); indexOut.flush(); } }
@Override protected boolean isSplitable(JobContext context, Path file) { final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration()).getCodec(file); if (null == codec) { return true; } return codec instanceof SplittableCompressionCodec; }
private static FileStatus decompress(FileSystem fs, String in, String outpath) throws IOException { Configuration conf = new Configuration(); CompressionCodecFactory factory = new CompressionCodecFactory(conf); CompressionCodec codec = factory.getCodec(new Path(in)); //Decompressing zip file. InputStream is = codec.createInputStream(fs.open(new Path(in))); OutputStream out = fs.create(new Path(outpath)); //Write decompressed out IOUtils.copyBytes(is, out, conf); is.close(); out.close(); return fs.getFileStatus(new Path(outpath)); }
private static void decompress(FileSystem fs, String in, String outpath) throws IOException { Configuration conf = new Configuration(); CompressionCodecFactory factory = new CompressionCodecFactory(conf); CompressionCodec codec = factory.getCodec(new Path(in)); //Decompressing zip file. InputStream is = codec.createInputStream(fs.open(new Path(in))); OutputStream out = fs.create(new Path(outpath)); //Write decompressed out IOUtils.copyBytes(is, out, conf); is.close(); out.close(); }
private CompressionCodec buildCodec(Configuration conf) { try { Class<?> externalCodec = getClassLoaderForCodec().loadClass("com.hadoop.compression.lzo.LzoCodec"); return (CompressionCodec) ReflectionUtils.newInstance(externalCodec, new Configuration( conf)); } catch (ClassNotFoundException e) { throw new RuntimeException(e); } }
public RecordWriter<K, V> getRecordWriter( FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { // get the path of the temporary output file Path file = FileOutputFormat.getTaskOutputPath(job, name); FileSystem fs = file.getFileSystem(job); CompressionCodec codec = null; CompressionType compressionType = CompressionType.NONE; if (getCompressOutput(job)) { // find the kind of compression to do compressionType = getOutputCompressionType(job); // find the right codec Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, DefaultCodec.class); codec = ReflectionUtils.newInstance(codecClass, job); } final SequenceFile.Writer out = SequenceFile.createWriter(fs, job, file, job.getOutputKeyClass(), job.getOutputValueClass(), compressionType, codec, progress); return new RecordWriter<K, V>() { public void write(K key, V value) throws IOException { out.append(key, value); } public void close(Reporter reporter) throws IOException { out.close();} }; }
BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize, Context context, String filePath, String fileName, String inUsePrefix, String inUseSuffix, String fileSuffix, CompressionCodec codeC, CompressionType compType, HDFSWriter writer, ScheduledExecutorService timedRollerPool, PrivilegedExecutor proxyUser, SinkCounter sinkCounter, int idleTimeout, HDFSSink.WriterCallback onCloseCallback, String onCloseCallbackPath, long callTimeout, ExecutorService callTimeoutPool, long retryInterval, int maxCloseTries) { this.rollInterval = rollInterval; this.rollSize = rollSize; this.rollCount = rollCount; this.batchSize = batchSize; this.filePath = filePath; this.fileName = fileName; this.inUsePrefix = inUsePrefix; this.inUseSuffix = inUseSuffix; this.fileSuffix = fileSuffix; this.codeC = codeC; this.compType = compType; this.writer = writer; this.timedRollerPool = timedRollerPool; this.proxyUser = proxyUser; this.sinkCounter = sinkCounter; this.idleTimeout = idleTimeout; this.onCloseCallback = onCloseCallback; this.onCloseCallbackPath = onCloseCallbackPath; this.callTimeout = callTimeout; this.callTimeoutPool = callTimeoutPool; fileExtensionCounter = new AtomicLong(clock.currentTimeMillis()); this.retryInterval = retryInterval; this.maxRenameTries = maxCloseTries; isOpen = false; isUnderReplicated = false; this.writer.configure(context); }
public InputStream openPossiblyCompressedStream(Path path) throws IOException { CompressionCodec codec = codecFactory.getCodec(path); // infers from file ext. if (codec != null) { return codec.createInputStream(open(path)); } else { return open(path); } }