Java 类org.apache.hadoop.io.compress.CodecPool 实例源码

项目:hadoop-oss    文件:SequenceFile.java   
/** Close the file. */
@Override
public synchronized void close() throws IOException {
  keySerializer.close();
  uncompressedValSerializer.close();
  if (compressedValSerializer != null) {
    compressedValSerializer.close();
  }

  CodecPool.returnCompressor(compressor);
  compressor = null;

  if (out != null) {

    // Close the underlying stream iff we own it...
    if (ownOutputStream) {
      out.close();
    } else {
      out.flush();
    }
    out = null;
  }
}
项目:hadoop-oss    文件:SequenceFile.java   
/** Close the file. */
@Override
public synchronized void close() throws IOException {
  // Return the decompressors to the pool
  CodecPool.returnDecompressor(keyLenDecompressor);
  CodecPool.returnDecompressor(keyDecompressor);
  CodecPool.returnDecompressor(valLenDecompressor);
  CodecPool.returnDecompressor(valDecompressor);
  keyLenDecompressor = keyDecompressor = null;
  valLenDecompressor = valDecompressor = null;

  if (keyDeserializer != null) {
    keyDeserializer.close();
  }
  if (valDeserializer != null) {
    valDeserializer.close();
  }

  // Close the input-stream
  in.close();
}
项目:hadoop-oss    文件:Compression.java   
public Compressor getCompressor() throws IOException {
  CompressionCodec codec = getCodec();
  if (codec != null) {
    Compressor compressor = CodecPool.getCompressor(codec);
    if (compressor != null) {
      if (compressor.finished()) {
        // Somebody returns the compressor to CodecPool but is still using
        // it.
        LOG.warn("Compressor obtained from CodecPool already finished()");
      } else {
        if(LOG.isDebugEnabled()) {
          LOG.debug("Got a compressor: " + compressor.hashCode());
        }
      }
      /**
       * Following statement is necessary to get around bugs in 0.18 where a
       * compressor is referenced after returned back to the codec pool.
       */
      compressor.reset();
    }
    return compressor;
  }
  return null;
}
项目:hadoop-oss    文件:Compression.java   
public Decompressor getDecompressor() throws IOException {
  CompressionCodec codec = getCodec();
  if (codec != null) {
    Decompressor decompressor = CodecPool.getDecompressor(codec);
    if (decompressor != null) {
      if (decompressor.finished()) {
        // Somebody returns the decompressor to CodecPool but is still using
        // it.
        LOG.warn("Deompressor obtained from CodecPool already finished()");
      } else {
        if(LOG.isDebugEnabled()) {
          LOG.debug("Got a decompressor: " + decompressor.hashCode());
        }
      }
      /**
       * Following statement is necessary to get around bugs in 0.18 where a
       * decompressor is referenced after returned back to the codec pool.
       */
      decompressor.reset();
    }
    return decompressor;
  }

  return null;
}
项目:flume-release-1.7.0    文件:HDFSCompressedDataStream.java   
@Override
public void close() throws IOException {
  serializer.flush();
  serializer.beforeClose();
  if (!isFinished) {
    cmpOut.finish();
    isFinished = true;
  }
  fsOut.flush();
  hflushOrSync(fsOut);
  cmpOut.close();
  if (compressor != null) {
    CodecPool.returnCompressor(compressor);
    compressor = null;
  }
  unregisterCurrentStream();
}
项目:hadoop    文件:IFile.java   
/**
 * Construct an IFile Reader.
 * 
 * @param conf Configuration File 
 * @param in   The input stream
 * @param length Length of the data in the stream, including the checksum
 *               bytes.
 * @param codec codec
 * @param readsCounter Counter for records read from disk
 * @throws IOException
 */
public Reader(Configuration conf, FSDataInputStream in, long length, 
              CompressionCodec codec,
              Counters.Counter readsCounter) throws IOException {
  readRecordsCounter = readsCounter;
  checksumIn = new IFileInputStream(in,length, conf);
  if (codec != null) {
    decompressor = CodecPool.getDecompressor(codec);
    if (decompressor != null) {
      this.in = codec.createInputStream(checksumIn, decompressor);
    } else {
      LOG.warn("Could not obtain decompressor from CodecPool");
      this.in = checksumIn;
    }
  } else {
    this.in = checksumIn;
  }
  this.dataIn = new DataInputStream(this.in);
  this.fileLength = length;

  if (conf != null) {
    bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
  }
}
项目:hadoop    文件:IFile.java   
public void close() throws IOException {
  // Close the underlying stream
  in.close();

  // Release the buffer
  dataIn = null;
  buffer = null;
  if(readRecordsCounter != null) {
    readRecordsCounter.increment(numRecordsRead);
  }

  // Return the decompressor
  if (decompressor != null) {
    decompressor.reset();
    CodecPool.returnDecompressor(decompressor);
    decompressor = null;
  }
}
项目:hadoop    文件:InMemoryMapOutput.java   
public InMemoryMapOutput(Configuration conf, TaskAttemptID mapId,
                         MergeManagerImpl<K, V> merger,
                         int size, CompressionCodec codec,
                         boolean primaryMapOutput) {
  super(mapId, (long)size, primaryMapOutput);
  this.conf = conf;
  this.merger = merger;
  this.codec = codec;
  byteStream = new BoundedByteArrayOutputStream(size);
  memory = byteStream.getBuffer();
  if (codec != null) {
    decompressor = CodecPool.getDecompressor(codec);
  } else {
    decompressor = null;
  }
}
项目:hadoop    文件:PossiblyDecompressedInputStream.java   
public PossiblyDecompressedInputStream(Path inputPath, Configuration conf)
    throws IOException {
  CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
  CompressionCodec inputCodec = codecs.getCodec(inputPath);

  FileSystem ifs = inputPath.getFileSystem(conf);
  FSDataInputStream fileIn = ifs.open(inputPath);

  if (inputCodec == null) {
    decompressor = null;
    coreInputStream = fileIn;
  } else {
    decompressor = CodecPool.getDecompressor(inputCodec);
    coreInputStream = inputCodec.createInputStream(fileIn, decompressor);
  }
}
项目:hadoop    文件:Anonymizer.java   
private JsonGenerator createJsonGenerator(Configuration conf, Path path) 
throws IOException {
  FileSystem outFS = path.getFileSystem(conf);
  CompressionCodec codec =
    new CompressionCodecFactory(conf).getCodec(path);
  OutputStream output;
  Compressor compressor = null;
  if (codec != null) {
    compressor = CodecPool.getCompressor(codec);
    output = codec.createOutputStream(outFS.create(path), compressor);
  } else {
    output = outFS.create(path);
  }

  JsonGenerator outGen = outFactory.createJsonGenerator(output, 
                                                        JsonEncoding.UTF8);
  outGen.useDefaultPrettyPrinter();

  return outGen;
}
项目:hadoop    文件:SequenceFile.java   
/** Close the file. */
@Override
public synchronized void close() throws IOException {
  keySerializer.close();
  uncompressedValSerializer.close();
  if (compressedValSerializer != null) {
    compressedValSerializer.close();
  }

  CodecPool.returnCompressor(compressor);
  compressor = null;

  if (out != null) {

    // Close the underlying stream iff we own it...
    if (ownOutputStream) {
      out.close();
    } else {
      out.flush();
    }
    out = null;
  }
}
项目:hadoop    文件:SequenceFile.java   
/** Close the file. */
@Override
public synchronized void close() throws IOException {
  // Return the decompressors to the pool
  CodecPool.returnDecompressor(keyLenDecompressor);
  CodecPool.returnDecompressor(keyDecompressor);
  CodecPool.returnDecompressor(valLenDecompressor);
  CodecPool.returnDecompressor(valDecompressor);
  keyLenDecompressor = keyDecompressor = null;
  valLenDecompressor = valDecompressor = null;

  if (keyDeserializer != null) {
    keyDeserializer.close();
  }
  if (valDeserializer != null) {
    valDeserializer.close();
  }

  // Close the input-stream
  in.close();
}
项目:hadoop    文件:Compression.java   
public Compressor getCompressor() throws IOException {
  CompressionCodec codec = getCodec();
  if (codec != null) {
    Compressor compressor = CodecPool.getCompressor(codec);
    if (compressor != null) {
      if (compressor.finished()) {
        // Somebody returns the compressor to CodecPool but is still using
        // it.
        LOG.warn("Compressor obtained from CodecPool already finished()");
      } else {
        if(LOG.isDebugEnabled()) {
          LOG.debug("Got a compressor: " + compressor.hashCode());
        }
      }
      /**
       * Following statement is necessary to get around bugs in 0.18 where a
       * compressor is referenced after returned back to the codec pool.
       */
      compressor.reset();
    }
    return compressor;
  }
  return null;
}
项目:hadoop    文件:Compression.java   
public Decompressor getDecompressor() throws IOException {
  CompressionCodec codec = getCodec();
  if (codec != null) {
    Decompressor decompressor = CodecPool.getDecompressor(codec);
    if (decompressor != null) {
      if (decompressor.finished()) {
        // Somebody returns the decompressor to CodecPool but is still using
        // it.
        LOG.warn("Deompressor obtained from CodecPool already finished()");
      } else {
        if(LOG.isDebugEnabled()) {
          LOG.debug("Got a decompressor: " + decompressor.hashCode());
        }
      }
      /**
       * Following statement is necessary to get around bugs in 0.18 where a
       * decompressor is referenced after returned back to the codec pool.
       */
      decompressor.reset();
    }
    return decompressor;
  }

  return null;
}
项目:ditb    文件:Compression.java   
public Compressor getCompressor() {
  CompressionCodec codec = getCodec(conf);
  if (codec != null) {
    Compressor compressor = CodecPool.getCompressor(codec);
    if (LOG.isTraceEnabled()) LOG.trace("Retrieved compressor " + compressor + " from pool.");
    if (compressor != null) {
      if (compressor.finished()) {
        // Somebody returns the compressor to CodecPool but is still using it.
        LOG.warn("Compressor obtained from CodecPool is already finished()");
      }
      compressor.reset();
    }
    return compressor;
  }
  return null;
}
项目:ditb    文件:Compression.java   
public Decompressor getDecompressor() {
  CompressionCodec codec = getCodec(conf);
  if (codec != null) {
    Decompressor decompressor = CodecPool.getDecompressor(codec);
    if (LOG.isTraceEnabled()) LOG.trace("Retrieved decompressor " + decompressor
        + " from pool.");
    if (decompressor != null) {
      if (decompressor.finished()) {
        // Somebody returns the decompressor to CodecPool but is still using it.
        LOG.warn("Deompressor obtained from CodecPool is already finished()");
      }
      decompressor.reset();
    }
    return decompressor;
  }

  return null;
}
项目:aliyun-oss-hadoop-fs    文件:IFile.java   
/**
 * Construct an IFile Reader.
 * 
 * @param conf Configuration File 
 * @param in   The input stream
 * @param length Length of the data in the stream, including the checksum
 *               bytes.
 * @param codec codec
 * @param readsCounter Counter for records read from disk
 * @throws IOException
 */
public Reader(Configuration conf, FSDataInputStream in, long length, 
              CompressionCodec codec,
              Counters.Counter readsCounter) throws IOException {
  readRecordsCounter = readsCounter;
  checksumIn = new IFileInputStream(in,length, conf);
  if (codec != null) {
    decompressor = CodecPool.getDecompressor(codec);
    if (decompressor != null) {
      this.in = codec.createInputStream(checksumIn, decompressor);
    } else {
      LOG.warn("Could not obtain decompressor from CodecPool");
      this.in = checksumIn;
    }
  } else {
    this.in = checksumIn;
  }
  this.dataIn = new DataInputStream(this.in);
  this.fileLength = length;

  if (conf != null) {
    bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
  }
}
项目:aliyun-oss-hadoop-fs    文件:IFile.java   
public void close() throws IOException {
  // Close the underlying stream
  in.close();

  // Release the buffer
  dataIn = null;
  buffer = null;
  if(readRecordsCounter != null) {
    readRecordsCounter.increment(numRecordsRead);
  }

  // Return the decompressor
  if (decompressor != null) {
    decompressor.reset();
    CodecPool.returnDecompressor(decompressor);
    decompressor = null;
  }
}
项目:aliyun-oss-hadoop-fs    文件:PossiblyDecompressedInputStream.java   
public PossiblyDecompressedInputStream(Path inputPath, Configuration conf)
    throws IOException {
  CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
  CompressionCodec inputCodec = codecs.getCodec(inputPath);

  FileSystem ifs = inputPath.getFileSystem(conf);
  FSDataInputStream fileIn = ifs.open(inputPath);

  if (inputCodec == null) {
    decompressor = null;
    coreInputStream = fileIn;
  } else {
    decompressor = CodecPool.getDecompressor(inputCodec);
    coreInputStream = inputCodec.createInputStream(fileIn, decompressor);
  }
}
项目:aliyun-oss-hadoop-fs    文件:Anonymizer.java   
private JsonGenerator createJsonGenerator(Configuration conf, Path path) 
throws IOException {
  FileSystem outFS = path.getFileSystem(conf);
  CompressionCodec codec =
    new CompressionCodecFactory(conf).getCodec(path);
  OutputStream output;
  Compressor compressor = null;
  if (codec != null) {
    compressor = CodecPool.getCompressor(codec);
    output = codec.createOutputStream(outFS.create(path), compressor);
  } else {
    output = outFS.create(path);
  }

  JsonGenerator outGen = outFactory.createJsonGenerator(output, 
                                                        JsonEncoding.UTF8);
  outGen.useDefaultPrettyPrinter();

  return outGen;
}
项目:aliyun-oss-hadoop-fs    文件:SequenceFile.java   
/** Close the file. */
@Override
public synchronized void close() throws IOException {
  keySerializer.close();
  uncompressedValSerializer.close();
  if (compressedValSerializer != null) {
    compressedValSerializer.close();
  }

  CodecPool.returnCompressor(compressor);
  compressor = null;

  if (out != null) {

    // Close the underlying stream iff we own it...
    if (ownOutputStream) {
      out.close();
    } else {
      out.flush();
    }
    out = null;
  }
}
项目:aliyun-oss-hadoop-fs    文件:SequenceFile.java   
/** Close the file. */
@Override
public synchronized void close() throws IOException {
  // Return the decompressors to the pool
  CodecPool.returnDecompressor(keyLenDecompressor);
  CodecPool.returnDecompressor(keyDecompressor);
  CodecPool.returnDecompressor(valLenDecompressor);
  CodecPool.returnDecompressor(valDecompressor);
  keyLenDecompressor = keyDecompressor = null;
  valLenDecompressor = valDecompressor = null;

  if (keyDeserializer != null) {
    keyDeserializer.close();
  }
  if (valDeserializer != null) {
    valDeserializer.close();
  }

  // Close the input-stream
  in.close();
}
项目:aliyun-oss-hadoop-fs    文件:Compression.java   
public Compressor getCompressor() throws IOException {
  CompressionCodec codec = getCodec();
  if (codec != null) {
    Compressor compressor = CodecPool.getCompressor(codec);
    if (compressor != null) {
      if (compressor.finished()) {
        // Somebody returns the compressor to CodecPool but is still using
        // it.
        LOG.warn("Compressor obtained from CodecPool already finished()");
      } else {
        if(LOG.isDebugEnabled()) {
          LOG.debug("Got a compressor: " + compressor.hashCode());
        }
      }
      /**
       * Following statement is necessary to get around bugs in 0.18 where a
       * compressor is referenced after returned back to the codec pool.
       */
      compressor.reset();
    }
    return compressor;
  }
  return null;
}
项目:aliyun-oss-hadoop-fs    文件:Compression.java   
public Decompressor getDecompressor() throws IOException {
  CompressionCodec codec = getCodec();
  if (codec != null) {
    Decompressor decompressor = CodecPool.getDecompressor(codec);
    if (decompressor != null) {
      if (decompressor.finished()) {
        // Somebody returns the decompressor to CodecPool but is still using
        // it.
        LOG.warn("Deompressor obtained from CodecPool already finished()");
      } else {
        if(LOG.isDebugEnabled()) {
          LOG.debug("Got a decompressor: " + decompressor.hashCode());
        }
      }
      /**
       * Following statement is necessary to get around bugs in 0.18 where a
       * decompressor is referenced after returned back to the codec pool.
       */
      decompressor.reset();
    }
    return decompressor;
  }

  return null;
}
项目:gemfirexd-oss    文件:SequenceFile.java   
/** Close the file. */
@Override
public synchronized void close() throws IOException {
  keySerializer.close();
  uncompressedValSerializer.close();
  if (compressedValSerializer != null) {
    compressedValSerializer.close();
  }

  CodecPool.returnCompressor(compressor);
  compressor = null;

  if (out != null) {

    // Close the underlying stream iff we own it...
    if (ownOutputStream) {
      out.close();
    } else {
      out.flush();
    }
    out = null;
  }
}
项目:gemfirexd-oss    文件:SequenceFile.java   
/** Close the file. */
@Override
public synchronized void close() throws IOException {
  // Return the decompressors to the pool
  CodecPool.returnDecompressor(keyLenDecompressor);
  CodecPool.returnDecompressor(keyDecompressor);
  CodecPool.returnDecompressor(valLenDecompressor);
  CodecPool.returnDecompressor(valDecompressor);
  keyLenDecompressor = keyDecompressor = null;
  valLenDecompressor = valDecompressor = null;

  if (keyDeserializer != null) {
    keyDeserializer.close();
  }
  if (valDeserializer != null) {
    valDeserializer.close();
  }

  // Close the input-stream
  in.close();
}
项目:big-c    文件:IFile.java   
/**
 * Construct an IFile Reader.
 * 
 * @param conf Configuration File 
 * @param in   The input stream
 * @param length Length of the data in the stream, including the checksum
 *               bytes.
 * @param codec codec
 * @param readsCounter Counter for records read from disk
 * @throws IOException
 */
public Reader(Configuration conf, FSDataInputStream in, long length, 
              CompressionCodec codec,
              Counters.Counter readsCounter) throws IOException {
  readRecordsCounter = readsCounter;
  checksumIn = new IFileInputStream(in,length, conf);
  if (codec != null) {
    decompressor = CodecPool.getDecompressor(codec);
    if (decompressor != null) {
      this.in = codec.createInputStream(checksumIn, decompressor);
    } else {
      LOG.warn("Could not obtain decompressor from CodecPool");
      this.in = checksumIn;
    }
  } else {
    this.in = checksumIn;
  }
  this.dataIn = new DataInputStream(this.in);
  this.fileLength = length;

  if (conf != null) {
    bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
  }
}
项目:big-c    文件:IFile.java   
public void close() throws IOException {
  // Close the underlying stream
  in.close();

  // Release the buffer
  dataIn = null;
  buffer = null;
  if(readRecordsCounter != null) {
    readRecordsCounter.increment(numRecordsRead);
  }

  // Return the decompressor
  if (decompressor != null) {
    decompressor.reset();
    CodecPool.returnDecompressor(decompressor);
    decompressor = null;
  }
}
项目:big-c    文件:InMemoryMapOutput.java   
public InMemoryMapOutput(Configuration conf, TaskAttemptID mapId,
                         MergeManagerImpl<K, V> merger,
                         int size, CompressionCodec codec,
                         boolean primaryMapOutput) {
  super(mapId, (long)size, primaryMapOutput);
  this.conf = conf;
  this.merger = merger;
  this.codec = codec;
  byteStream = new BoundedByteArrayOutputStream(size);
  memory = byteStream.getBuffer();
  if (codec != null) {
    decompressor = CodecPool.getDecompressor(codec);
  } else {
    decompressor = null;
  }
}
项目:big-c    文件:PossiblyDecompressedInputStream.java   
public PossiblyDecompressedInputStream(Path inputPath, Configuration conf)
    throws IOException {
  CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
  CompressionCodec inputCodec = codecs.getCodec(inputPath);

  FileSystem ifs = inputPath.getFileSystem(conf);
  FSDataInputStream fileIn = ifs.open(inputPath);

  if (inputCodec == null) {
    decompressor = null;
    coreInputStream = fileIn;
  } else {
    decompressor = CodecPool.getDecompressor(inputCodec);
    coreInputStream = inputCodec.createInputStream(fileIn, decompressor);
  }
}
项目:big-c    文件:Anonymizer.java   
private JsonGenerator createJsonGenerator(Configuration conf, Path path) 
throws IOException {
  FileSystem outFS = path.getFileSystem(conf);
  CompressionCodec codec =
    new CompressionCodecFactory(conf).getCodec(path);
  OutputStream output;
  Compressor compressor = null;
  if (codec != null) {
    compressor = CodecPool.getCompressor(codec);
    output = codec.createOutputStream(outFS.create(path), compressor);
  } else {
    output = outFS.create(path);
  }

  JsonGenerator outGen = outFactory.createJsonGenerator(output, 
                                                        JsonEncoding.UTF8);
  outGen.useDefaultPrettyPrinter();

  return outGen;
}
项目:big-c    文件:SequenceFile.java   
/** Close the file. */
@Override
public synchronized void close() throws IOException {
  keySerializer.close();
  uncompressedValSerializer.close();
  if (compressedValSerializer != null) {
    compressedValSerializer.close();
  }

  CodecPool.returnCompressor(compressor);
  compressor = null;

  if (out != null) {

    // Close the underlying stream iff we own it...
    if (ownOutputStream) {
      out.close();
    } else {
      out.flush();
    }
    out = null;
  }
}
项目:big-c    文件:SequenceFile.java   
/** Close the file. */
@Override
public synchronized void close() throws IOException {
  // Return the decompressors to the pool
  CodecPool.returnDecompressor(keyLenDecompressor);
  CodecPool.returnDecompressor(keyDecompressor);
  CodecPool.returnDecompressor(valLenDecompressor);
  CodecPool.returnDecompressor(valDecompressor);
  keyLenDecompressor = keyDecompressor = null;
  valLenDecompressor = valDecompressor = null;

  if (keyDeserializer != null) {
    keyDeserializer.close();
  }
  if (valDeserializer != null) {
    valDeserializer.close();
  }

  // Close the input-stream
  in.close();
}
项目:big-c    文件:Compression.java   
public Compressor getCompressor() throws IOException {
  CompressionCodec codec = getCodec();
  if (codec != null) {
    Compressor compressor = CodecPool.getCompressor(codec);
    if (compressor != null) {
      if (compressor.finished()) {
        // Somebody returns the compressor to CodecPool but is still using
        // it.
        LOG.warn("Compressor obtained from CodecPool already finished()");
      } else {
        if(LOG.isDebugEnabled()) {
          LOG.debug("Got a compressor: " + compressor.hashCode());
        }
      }
      /**
       * Following statement is necessary to get around bugs in 0.18 where a
       * compressor is referenced after returned back to the codec pool.
       */
      compressor.reset();
    }
    return compressor;
  }
  return null;
}
项目:big-c    文件:Compression.java   
public Decompressor getDecompressor() throws IOException {
  CompressionCodec codec = getCodec();
  if (codec != null) {
    Decompressor decompressor = CodecPool.getDecompressor(codec);
    if (decompressor != null) {
      if (decompressor.finished()) {
        // Somebody returns the decompressor to CodecPool but is still using
        // it.
        LOG.warn("Deompressor obtained from CodecPool already finished()");
      } else {
        if(LOG.isDebugEnabled()) {
          LOG.debug("Got a decompressor: " + decompressor.hashCode());
        }
      }
      /**
       * Following statement is necessary to get around bugs in 0.18 where a
       * decompressor is referenced after returned back to the codec pool.
       */
      decompressor.reset();
    }
    return decompressor;
  }

  return null;
}
项目:hadoopoffice    文件:MapReduceExcelOutputIntegrationTest.java   
private InputStream openFile(Path path) throws IOException {
        CompressionCodec codec=new CompressionCodecFactory(miniCluster.getConfig()).getCodec(path);
    FSDataInputStream fileIn=dfsCluster.getFileSystem().open(path);
    // check if compressed
    if (codec==null) { // uncompressed
        return fileIn;
    } else { // compressed
        Decompressor decompressor = CodecPool.getDecompressor(codec);
        this.openDecompressors.add(decompressor); // to be returned later using close
        if (codec instanceof SplittableCompressionCodec) {
            long end = dfsCluster.getFileSystem().getFileStatus(path).getLen(); 
                final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
                    return cIn;
            } else {
                return codec.createInputStream(fileIn,decompressor);
            }
    }
}
项目:hadoopoffice    文件:MapReduceExcelInputIntegrationTest.java   
private InputStream openFile(Path path) throws IOException {
        CompressionCodec codec=new CompressionCodecFactory(miniCluster.getConfig()).getCodec(path);
    FSDataInputStream fileIn=dfsCluster.getFileSystem().open(path);
    // check if compressed
    if (codec==null) { // uncompressed
        return fileIn;
    } else { // compressed
        Decompressor decompressor = CodecPool.getDecompressor(codec);
        this.openDecompressors.add(decompressor); // to be returned later using close
        if (codec instanceof SplittableCompressionCodec) {
            long end = dfsCluster.getFileSystem().getFileStatus(path).getLen(); 
                final SplitCompressionInputStream cIn =((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
                    return cIn;
            } else {
                return codec.createInputStream(fileIn,decompressor);
            }
    }
}
项目:hadoopoffice    文件:AbstractSpreadSheetDocumentRecordReader.java   
@Override
public synchronized void  close() throws IOException {
try {
    if (officeReader!=null) {
    officeReader.close();
     }
    } finally {
      if (decompressor != null) { // return this decompressor
        CodecPool.returnDecompressor(decompressor);
        decompressor = null;
      } // return decompressor of linked workbooks
    if (this.currentHFR!=null) {
        currentHFR.close();
    }
    }
    // do not close the filesystem! will cause exceptions in Spark
}
项目:hadoopoffice    文件:AbstractSpreadSheetDocumentRecordReader.java   
@Override
public synchronized void  close() throws IOException {
try {
    if (officeReader!=null) {
    officeReader.close();
     }
    } finally {
      if (decompressor != null) { // return this decompressor
        CodecPool.returnDecompressor(decompressor);
        decompressor = null;
      } // return decompressor of linked workbooks
    if (this.currentHFR!=null) {
        currentHFR.close();
    }
    }
    // do not close the filesystem! will cause exceptions in Spark

}
项目:hadoopoffice    文件:HadoopFileReader.java   
public InputStream openFile(Path path) throws IOException {
        CompressionCodec codec=compressionCodecs.getCodec(path);
    FSDataInputStream fileIn=fs.open(path);
    // check if compressed
    if (codec==null) { // uncompressed
    LOG.debug("Reading from an uncompressed file \""+path+"\"");
        return fileIn;
    } else { // compressed
        Decompressor decompressor = CodecPool.getDecompressor(codec);
        this.openDecompressors.add(decompressor); // to be returned later using close
        if (codec instanceof SplittableCompressionCodec) {
            LOG.debug("Reading from a compressed file \""+path+"\" with splittable compression codec");
            long end = fs.getFileStatus(path).getLen(); 
                return ((SplittableCompressionCodec)codec).createInputStream(fileIn, decompressor, 0, end,SplittableCompressionCodec.READ_MODE.CONTINUOUS);
            } else {
            LOG.debug("Reading from a compressed file \""+path+"\" with non-splittable compression codec");
                return codec.createInputStream(fileIn,decompressor);
            }
    }
}