@Override /** {@inheritDoc} */ public InputStream readBlobRecord() throws IOException { if (!isRecordAvailable()) { // we're not currently aligned on a record-start. // Try to get the next one. if (!next()) { // No more records available. throw new EOFException("End of file reached."); } } // Ensure any previously-open user record stream is closed. closeUserStream(); // Mark this record as consumed. this.isAligned = false; // The length of the stream we can return to the user is // the indexRecordLen minus the length of any per-record headers. // That includes the RecordStartMark, the entryId, and the claimedLen. long streamLen = this.indexRecordLen - RecordStartMark.START_MARK_LENGTH - WritableUtils.getVIntSize(this.curEntryId) - WritableUtils.getVIntSize(this.claimedRecordLen); LOG.debug("Yielding stream to user with length " + streamLen); this.userInputStream = new FixedLengthInputStream(this.dataIn, streamLen); if (this.codec != null) { // The user needs to decompress the data; wrap the InputStream. decompressor.reset(); this.userInputStream = new DecompressorStream( this.userInputStream, decompressor); } return this.userInputStream; }
@Override public CompressionInputStream createInputStream(InputStream in, Decompressor decompressor) throws IOException { return new DecompressorStream(in, decompressor, getBufferSize()); }
@Override public CompressionInputStream createInputStream(InputStream in, Decompressor decompressor) throws IOException { Preconditions.checkArgument(decompressor instanceof D2Decompressor, "Requires a %s", D2Decompressor.class); // prepare the stream to strip the footer return new DecompressorStream(D2Utils.prepareD2Stream(in), decompressor); }