Java 类org.apache.hadoop.mapred.IFileInputStream 实例源码

项目:aliyun-oss-hadoop-fs    文件:InMemoryMapOutput.java   
@Override
protected void doShuffle(MapHost host, IFileInputStream iFin,
                    long compressedLength, long decompressedLength,
                    ShuffleClientMetrics metrics,
                    Reporter reporter) throws IOException {
  InputStream input = iFin;

  // Are map-outputs compressed?
  if (codec != null) {
    decompressor.reset();
    input = codec.createInputStream(input, decompressor);
  }

  try {
    IOUtils.readFully(input, memory, 0, memory.length);
    metrics.inputBytes(memory.length);
    reporter.progress();
    LOG.info("Read " + memory.length + " bytes from map-output for " +
              getMapId());

    /**
     * We've gotten the amount of data we were expecting. Verify the
     * decompressor has nothing more to offer. This action also forces the
     * decompressor to read any trailing bytes that weren't critical
     * for decompression, which is necessary to keep the stream
     * in sync.
     */
    if (input.read() >= 0 ) {
      throw new IOException("Unexpected extra bytes from input stream for " +
                             getMapId());
    }
  } finally {
    CodecPool.returnDecompressor(decompressor);
  }
}
项目:aliyun-oss-hadoop-fs    文件:IFileWrappedMapOutput.java   
@Override
public void shuffle(MapHost host, InputStream input,
                    long compressedLength, long decompressedLength,
                    ShuffleClientMetrics metrics,
                    Reporter reporter) throws IOException {
  doShuffle(host, new IFileInputStream(input, compressedLength, conf),
      compressedLength, decompressedLength, metrics, reporter);
}
项目:hops    文件:InMemoryMapOutput.java   
@Override
protected void doShuffle(MapHost host, IFileInputStream iFin,
                    long compressedLength, long decompressedLength,
                    ShuffleClientMetrics metrics,
                    Reporter reporter) throws IOException {
  InputStream input = iFin;

  // Are map-outputs compressed?
  if (codec != null) {
    decompressor.reset();
    input = codec.createInputStream(input, decompressor);
  }

  try {
    IOUtils.readFully(input, memory, 0, memory.length);
    metrics.inputBytes(memory.length);
    reporter.progress();
    LOG.info("Read " + memory.length + " bytes from map-output for " +
              getMapId());

    /**
     * We've gotten the amount of data we were expecting. Verify the
     * decompressor has nothing more to offer. This action also forces the
     * decompressor to read any trailing bytes that weren't critical
     * for decompression, which is necessary to keep the stream
     * in sync.
     */
    if (input.read() >= 0 ) {
      throw new IOException("Unexpected extra bytes from input stream for " +
                             getMapId());
    }
  } finally {
    CodecPool.returnDecompressor(decompressor);
  }
}
项目:hops    文件:IFileWrappedMapOutput.java   
@Override
public void shuffle(MapHost host, InputStream input,
                    long compressedLength, long decompressedLength,
                    ShuffleClientMetrics metrics,
                    Reporter reporter) throws IOException {
  doShuffle(host, new IFileInputStream(input, compressedLength, conf),
      compressedLength, decompressedLength, metrics, reporter);
}
项目:mapreduce-fork    文件:Fetcher.java   
private void shuffleToMemory(MapHost host, MapOutput<K,V> mapOutput, 
                             InputStream input, 
                             int decompressedLength, 
                             int compressedLength) throws IOException {    
  IFileInputStream checksumIn = 
    new IFileInputStream(input, compressedLength);

  input = checksumIn;       

  // Are map-outputs compressed?
  if (codec != null) {
    decompressor.reset();
    input = codec.createInputStream(input, decompressor);
  }

  // Copy map-output into an in-memory buffer
  byte[] shuffleData = mapOutput.getMemory();

  try {
    IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
    metrics.inputBytes(shuffleData.length);
    reporter.progress();
    LOG.info("Read " + shuffleData.length + " bytes from map-output for " +
             mapOutput.getMapId());
  } catch (IOException ioe) {      
    // Close the streams
    IOUtils.cleanup(LOG, input);

    // Re-throw
    throw ioe;
  }

}
项目:hadoop    文件:InMemoryMapOutput.java   
@Override
public void shuffle(MapHost host, InputStream input,
                    long compressedLength, long decompressedLength,
                    ShuffleClientMetrics metrics,
                    Reporter reporter) throws IOException {
  IFileInputStream checksumIn = 
    new IFileInputStream(input, compressedLength, conf);

  input = checksumIn;       

  // Are map-outputs compressed?
  if (codec != null) {
    decompressor.reset();
    input = codec.createInputStream(input, decompressor);
  }

  try {
    IOUtils.readFully(input, memory, 0, memory.length);
    metrics.inputBytes(memory.length);
    reporter.progress();
    LOG.info("Read " + memory.length + " bytes from map-output for " +
              getMapId());

    /**
     * We've gotten the amount of data we were expecting. Verify the
     * decompressor has nothing more to offer. This action also forces the
     * decompressor to read any trailing bytes that weren't critical
     * for decompression, which is necessary to keep the stream
     * in sync.
     */
    if (input.read() >= 0 ) {
      throw new IOException("Unexpected extra bytes from input stream for " +
                             getMapId());
    }

  } catch (IOException ioe) {      
    // Close the streams
    IOUtils.cleanup(LOG, input);

    // Re-throw
    throw ioe;
  } finally {
    CodecPool.returnDecompressor(decompressor);
  }
}
项目:aliyun-oss-hadoop-fs    文件:OnDiskMapOutput.java   
@Override
protected void doShuffle(MapHost host, IFileInputStream input,
                    long compressedLength, long decompressedLength,
                    ShuffleClientMetrics metrics,
                    Reporter reporter) throws IOException {
  // Copy data to local-disk
  long bytesLeft = compressedLength;
  try {
    final int BYTES_TO_READ = 64 * 1024;
    byte[] buf = new byte[BYTES_TO_READ];
    while (bytesLeft > 0) {
      int n = input.readWithChecksum(buf, 0,
                                    (int) Math.min(bytesLeft, BYTES_TO_READ));
      if (n < 0) {
        throw new IOException("read past end of stream reading " + 
                              getMapId());
      }
      disk.write(buf, 0, n);
      bytesLeft -= n;
      metrics.inputBytes(n);
      reporter.progress();
    }

    LOG.info("Read " + (compressedLength - bytesLeft) + 
             " bytes from map-output for " + getMapId());

    disk.close();
  } catch (IOException ioe) {
    // Close the streams
    IOUtils.cleanup(LOG, disk);

    // Re-throw
    throw ioe;
  }

  // Sanity check
  if (bytesLeft != 0) {
    throw new IOException("Incomplete map output received for " +
                          getMapId() + " from " +
                          host.getHostName() + " (" + 
                          bytesLeft + " bytes missing of " + 
                          compressedLength + ")");
  }
  this.compressedSize = compressedLength;
}
项目:aliyun-oss-hadoop-fs    文件:IFileWrappedMapOutput.java   
protected abstract void doShuffle(
MapHost host, IFileInputStream iFileInputStream,
long compressedLength, long decompressedLength,
ShuffleClientMetrics metrics, Reporter reporter) throws IOException;
项目:aliyun-oss-hadoop-fs    文件:TestFetcher.java   
@Test
public void testCorruptedIFile() throws Exception {
  final int fetcher = 7;
  Path onDiskMapOutputPath = new Path(name.getMethodName() + "/foo");
  Path shuffledToDisk =
      OnDiskMapOutput.getTempPath(onDiskMapOutputPath, fetcher);
  fs = FileSystem.getLocal(job).getRaw();
  IFileWrappedMapOutput<Text,Text> odmo =
      new OnDiskMapOutput<Text,Text>(map1ID, mm, 100L, job, fetcher, true,
                                     fs, onDiskMapOutputPath);

  String mapData = "MAPDATA12345678901234567890";

  ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 14, 10, 1);
  ByteArrayOutputStream bout = new ByteArrayOutputStream();
  DataOutputStream dos = new DataOutputStream(bout);
  IFileOutputStream ios = new IFileOutputStream(dos);
  header.write(dos);

  int headerSize = dos.size();
  try {
    ios.write(mapData.getBytes());
  } finally {
    ios.close();
  }

  int dataSize = bout.size() - headerSize;

  // Ensure that the OnDiskMapOutput shuffler can successfully read the data.
  MapHost host = new MapHost("TestHost", "http://test/url");
  ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
  try {
    // Read past the shuffle header.
    bin.read(new byte[headerSize], 0, headerSize);
    odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL);
  } finally {
    bin.close();
  }

  // Now corrupt the IFile data.
  byte[] corrupted = bout.toByteArray();
  corrupted[headerSize + (dataSize / 2)] = 0x0;

  try {
    bin = new ByteArrayInputStream(corrupted);
    // Read past the shuffle header.
    bin.read(new byte[headerSize], 0, headerSize);
    odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL);
    fail("OnDiskMapOutput.shuffle didn't detect the corrupted map partition file");
  } catch(ChecksumException e) {
    LOG.info("The expected checksum exception was thrown.", e);
  } finally {
    bin.close();
  }

  // Ensure that the shuffled file can be read.
  IFileInputStream iFin = new IFileInputStream(fs.open(shuffledToDisk), dataSize, job);
  try {
    iFin.read(new byte[dataSize], 0, dataSize);
  } finally {
    iFin.close();
  }
}
项目:big-c    文件:InMemoryMapOutput.java   
@Override
public void shuffle(MapHost host, InputStream input,
                    long compressedLength, long decompressedLength,
                    ShuffleClientMetrics metrics,
                    Reporter reporter) throws IOException {
  IFileInputStream checksumIn = 
    new IFileInputStream(input, compressedLength, conf);

  input = checksumIn;       

  // Are map-outputs compressed?
  if (codec != null) {
    decompressor.reset();
    input = codec.createInputStream(input, decompressor);
  }

  try {
    IOUtils.readFully(input, memory, 0, memory.length);
    metrics.inputBytes(memory.length);
    reporter.progress();
    LOG.info("Read " + memory.length + " bytes from map-output for " +
              getMapId());

    /**
     * We've gotten the amount of data we were expecting. Verify the
     * decompressor has nothing more to offer. This action also forces the
     * decompressor to read any trailing bytes that weren't critical
     * for decompression, which is necessary to keep the stream
     * in sync.
     */
    if (input.read() >= 0 ) {
      throw new IOException("Unexpected extra bytes from input stream for " +
                             getMapId());
    }

  } catch (IOException ioe) {      
    // Close the streams
    IOUtils.cleanup(LOG, input);

    // Re-throw
    throw ioe;
  } finally {
    CodecPool.returnDecompressor(decompressor);
  }
}
项目:big-c    文件:TestFetcher.java   
@Test
public void testCorruptedIFile() throws Exception {
  final int fetcher = 7;
  Path onDiskMapOutputPath = new Path(name.getMethodName() + "/foo");
  Path shuffledToDisk =
      OnDiskMapOutput.getTempPath(onDiskMapOutputPath, fetcher);
  fs = FileSystem.getLocal(job).getRaw();
  MapOutputFile mof = mock(MapOutputFile.class);
  OnDiskMapOutput<Text,Text> odmo = new OnDiskMapOutput<Text,Text>(map1ID,
      id, mm, 100L, job, mof, fetcher, true, fs, onDiskMapOutputPath);

  String mapData = "MAPDATA12345678901234567890";

  ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 14, 10, 1);
  ByteArrayOutputStream bout = new ByteArrayOutputStream();
  DataOutputStream dos = new DataOutputStream(bout);
  IFileOutputStream ios = new IFileOutputStream(dos);
  header.write(dos);

  int headerSize = dos.size();
  try {
    ios.write(mapData.getBytes());
  } finally {
    ios.close();
  }

  int dataSize = bout.size() - headerSize;

  // Ensure that the OnDiskMapOutput shuffler can successfully read the data.
  MapHost host = new MapHost("TestHost", "http://test/url");
  ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
  try {
    // Read past the shuffle header.
    bin.read(new byte[headerSize], 0, headerSize);
    odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL);
  } finally {
    bin.close();
  }

  // Now corrupt the IFile data.
  byte[] corrupted = bout.toByteArray();
  corrupted[headerSize + (dataSize / 2)] = 0x0;

  try {
    bin = new ByteArrayInputStream(corrupted);
    // Read past the shuffle header.
    bin.read(new byte[headerSize], 0, headerSize);
    odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL);
    fail("OnDiskMapOutput.shuffle didn't detect the corrupted map partition file");
  } catch(ChecksumException e) {
    LOG.info("The expected checksum exception was thrown.", e);
  } finally {
    bin.close();
  }

  // Ensure that the shuffled file can be read.
  IFileInputStream iFin = new IFileInputStream(fs.open(shuffledToDisk), dataSize, job);
  try {
    iFin.read(new byte[dataSize], 0, dataSize);
  } finally {
    iFin.close();
  }
}
项目:hadoop-2.6.0-cdh5.4.3    文件:InMemoryMapOutput.java   
@Override
public void shuffle(MapHost host, InputStream input,
                    long compressedLength, long decompressedLength,
                    ShuffleClientMetrics metrics,
                    Reporter reporter) throws IOException {
  IFileInputStream checksumIn = 
    new IFileInputStream(input, compressedLength, conf);

  input = checksumIn;       

  // Are map-outputs compressed?
  if (codec != null) {
    decompressor.reset();
    input = codec.createInputStream(input, decompressor);
  }

  try {
    IOUtils.readFully(input, memory, 0, memory.length);
    metrics.inputBytes(memory.length);
    reporter.progress();
    LOG.info("Read " + memory.length + " bytes from map-output for " +
              getMapId());

    /**
     * We've gotten the amount of data we were expecting. Verify the
     * decompressor has nothing more to offer. This action also forces the
     * decompressor to read any trailing bytes that weren't critical
     * for decompression, which is necessary to keep the stream
     * in sync.
     */
    if (input.read() >= 0 ) {
      throw new IOException("Unexpected extra bytes from input stream for " +
                             getMapId());
    }

  } catch (IOException ioe) {      
    // Close the streams
    IOUtils.cleanup(LOG, input);

    // Re-throw
    throw ioe;
  } finally {
    CodecPool.returnDecompressor(decompressor);
  }
}
项目:hadoop-plus    文件:InMemoryMapOutput.java   
@Override
public void shuffle(MapHost host, InputStream input,
                    long compressedLength, long decompressedLength,
                    ShuffleClientMetrics metrics,
                    Reporter reporter) throws IOException {
  IFileInputStream checksumIn = 
    new IFileInputStream(input, compressedLength, conf);

  input = checksumIn;       

  // Are map-outputs compressed?
  if (codec != null) {
    decompressor.reset();
    input = codec.createInputStream(input, decompressor);
  }

  try {
    IOUtils.readFully(input, memory, 0, memory.length);
    metrics.inputBytes(memory.length);
    reporter.progress();
    LOG.info("Read " + memory.length + " bytes from map-output for " +
              getMapId());

    /**
     * We've gotten the amount of data we were expecting. Verify the
     * decompressor has nothing more to offer. This action also forces the
     * decompressor to read any trailing bytes that weren't critical
     * for decompression, which is necessary to keep the stream
     * in sync.
     */
    if (input.read() >= 0 ) {
      throw new IOException("Unexpected extra bytes from input stream for " +
                             getMapId());
    }

  } catch (IOException ioe) {      
    // Close the streams
    IOUtils.cleanup(LOG, input);

    // Re-throw
    throw ioe;
  } finally {
    CodecPool.returnDecompressor(decompressor);
  }
}
项目:FlexMap    文件:InMemoryMapOutput.java   
@Override
public void shuffle(MapHost host, InputStream input,
                    long compressedLength, long decompressedLength,
                    ShuffleClientMetrics metrics,
                    Reporter reporter) throws IOException {
  IFileInputStream checksumIn = 
    new IFileInputStream(input, compressedLength, conf);

  input = checksumIn;       

  // Are map-outputs compressed?
  if (codec != null) {
    decompressor.reset();
    input = codec.createInputStream(input, decompressor);
  }

  try {
    IOUtils.readFully(input, memory, 0, memory.length);
    metrics.inputBytes(memory.length);
    reporter.progress();
    LOG.info("Read " + memory.length + " bytes from map-output for " +
              getMapId());

    /**
     * We've gotten the amount of data we were expecting. Verify the
     * decompressor has nothing more to offer. This action also forces the
     * decompressor to read any trailing bytes that weren't critical
     * for decompression, which is necessary to keep the stream
     * in sync.
     */
    if (input.read() >= 0 ) {
      throw new IOException("Unexpected extra bytes from input stream for " +
                             getMapId());
    }

  } catch (IOException ioe) {      
    // Close the streams
    IOUtils.cleanup(LOG, input);

    // Re-throw
    throw ioe;
  } finally {
    CodecPool.returnDecompressor(decompressor);
  }
}
项目:hops    文件:OnDiskMapOutput.java   
@Override
protected void doShuffle(MapHost host, IFileInputStream input,
                    long compressedLength, long decompressedLength,
                    ShuffleClientMetrics metrics,
                    Reporter reporter) throws IOException {
  // Copy data to local-disk
  long bytesLeft = compressedLength;
  try {
    final int BYTES_TO_READ = 64 * 1024;
    byte[] buf = new byte[BYTES_TO_READ];
    while (bytesLeft > 0) {
      int n = input.readWithChecksum(buf, 0,
                                    (int) Math.min(bytesLeft, BYTES_TO_READ));
      if (n < 0) {
        throw new IOException("read past end of stream reading " + 
                              getMapId());
      }
      disk.write(buf, 0, n);
      bytesLeft -= n;
      metrics.inputBytes(n);
      reporter.progress();
    }

    LOG.info("Read " + (compressedLength - bytesLeft) + 
             " bytes from map-output for " + getMapId());

    disk.close();
  } catch (IOException ioe) {
    // Close the streams
    IOUtils.cleanup(LOG, disk);

    // Re-throw
    throw ioe;
  }

  // Sanity check
  if (bytesLeft != 0) {
    throw new IOException("Incomplete map output received for " +
                          getMapId() + " from " +
                          host.getHostName() + " (" + 
                          bytesLeft + " bytes missing of " + 
                          compressedLength + ")");
  }
  this.compressedSize = compressedLength;
}
项目:hops    文件:IFileWrappedMapOutput.java   
protected abstract void doShuffle(
MapHost host, IFileInputStream iFileInputStream,
long compressedLength, long decompressedLength,
ShuffleClientMetrics metrics, Reporter reporter) throws IOException;
项目:hops    文件:TestFetcher.java   
@Test
public void testCorruptedIFile() throws Exception {
  final int fetcher = 7;
  Path onDiskMapOutputPath = new Path(name.getMethodName() + "/foo");
  Path shuffledToDisk =
      OnDiskMapOutput.getTempPath(onDiskMapOutputPath, fetcher);
  fs = FileSystem.getLocal(job).getRaw();
  IFileWrappedMapOutput<Text,Text> odmo =
      new OnDiskMapOutput<Text,Text>(map1ID, mm, 100L, job, fetcher, true,
                                     fs, onDiskMapOutputPath);

  String mapData = "MAPDATA12345678901234567890";

  ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 14, 10, 1);
  ByteArrayOutputStream bout = new ByteArrayOutputStream();
  DataOutputStream dos = new DataOutputStream(bout);
  IFileOutputStream ios = new IFileOutputStream(dos);
  header.write(dos);

  int headerSize = dos.size();
  try {
    ios.write(mapData.getBytes());
  } finally {
    ios.close();
  }

  int dataSize = bout.size() - headerSize;

  // Ensure that the OnDiskMapOutput shuffler can successfully read the data.
  MapHost host = new MapHost("TestHost", "http://test/url");
  ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
  try {
    // Read past the shuffle header.
    bin.read(new byte[headerSize], 0, headerSize);
    odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL);
  } finally {
    bin.close();
  }

  // Now corrupt the IFile data.
  byte[] corrupted = bout.toByteArray();
  corrupted[headerSize + (dataSize / 2)] = 0x0;

  try {
    bin = new ByteArrayInputStream(corrupted);
    // Read past the shuffle header.
    bin.read(new byte[headerSize], 0, headerSize);
    odmo.shuffle(host, bin, dataSize, dataSize, metrics, Reporter.NULL);
    fail("OnDiskMapOutput.shuffle didn't detect the corrupted map partition file");
  } catch(ChecksumException e) {
    LOG.info("The expected checksum exception was thrown.", e);
  } finally {
    bin.close();
  }

  // Ensure that the shuffled file can be read.
  IFileInputStream iFin = new IFileInputStream(fs.open(shuffledToDisk), dataSize, job);
  try {
    iFin.read(new byte[dataSize], 0, dataSize);
  } finally {
    iFin.close();
  }
}
项目:hadoop-TCP    文件:InMemoryMapOutput.java   
@Override
public void shuffle(MapHost host, InputStream input,
                    long compressedLength, long decompressedLength,
                    ShuffleClientMetrics metrics,
                    Reporter reporter) throws IOException {
  IFileInputStream checksumIn = 
    new IFileInputStream(input, compressedLength, conf);

  input = checksumIn;       

  // Are map-outputs compressed?
  if (codec != null) {
    decompressor.reset();
    input = codec.createInputStream(input, decompressor);
  }

  try {
    IOUtils.readFully(input, memory, 0, memory.length);
    metrics.inputBytes(memory.length);
    reporter.progress();
    LOG.info("Read " + memory.length + " bytes from map-output for " +
              getMapId());

    /**
     * We've gotten the amount of data we were expecting. Verify the
     * decompressor has nothing more to offer. This action also forces the
     * decompressor to read any trailing bytes that weren't critical
     * for decompression, which is necessary to keep the stream
     * in sync.
     */
    if (input.read() >= 0 ) {
      throw new IOException("Unexpected extra bytes from input stream for " +
                             getMapId());
    }

  } catch (IOException ioe) {      
    // Close the streams
    IOUtils.cleanup(LOG, input);

    // Re-throw
    throw ioe;
  } finally {
    CodecPool.returnDecompressor(decompressor);
  }
}
项目:hardfs    文件:InMemoryMapOutput.java   
@Override
public void shuffle(MapHost host, InputStream input,
                    long compressedLength, long decompressedLength,
                    ShuffleClientMetrics metrics,
                    Reporter reporter) throws IOException {
  IFileInputStream checksumIn = 
    new IFileInputStream(input, compressedLength, conf);

  input = checksumIn;       

  // Are map-outputs compressed?
  if (codec != null) {
    decompressor.reset();
    input = codec.createInputStream(input, decompressor);
  }

  try {
    IOUtils.readFully(input, memory, 0, memory.length);
    metrics.inputBytes(memory.length);
    reporter.progress();
    LOG.info("Read " + memory.length + " bytes from map-output for " +
              getMapId());

    /**
     * We've gotten the amount of data we were expecting. Verify the
     * decompressor has nothing more to offer. This action also forces the
     * decompressor to read any trailing bytes that weren't critical
     * for decompression, which is necessary to keep the stream
     * in sync.
     */
    if (input.read() >= 0 ) {
      throw new IOException("Unexpected extra bytes from input stream for " +
                             getMapId());
    }

  } catch (IOException ioe) {      
    // Close the streams
    IOUtils.cleanup(LOG, input);

    // Re-throw
    throw ioe;
  } finally {
    CodecPool.returnDecompressor(decompressor);
  }
}
项目:hadoop-on-lustre2    文件:InMemoryMapOutput.java   
@Override
public void shuffle(MapHost host, InputStream input,
                    long compressedLength, long decompressedLength,
                    ShuffleClientMetrics metrics,
                    Reporter reporter) throws IOException {
  IFileInputStream checksumIn = 
    new IFileInputStream(input, compressedLength, conf);

  input = checksumIn;       

  // Are map-outputs compressed?
  if (codec != null) {
    decompressor.reset();
    input = codec.createInputStream(input, decompressor);
  }

  try {
    IOUtils.readFully(input, memory, 0, memory.length);
    metrics.inputBytes(memory.length);
    reporter.progress();
    LOG.info("Read " + memory.length + " bytes from map-output for " +
              getMapId());

    /**
     * We've gotten the amount of data we were expecting. Verify the
     * decompressor has nothing more to offer. This action also forces the
     * decompressor to read any trailing bytes that weren't critical
     * for decompression, which is necessary to keep the stream
     * in sync.
     */
    if (input.read() >= 0 ) {
      throw new IOException("Unexpected extra bytes from input stream for " +
                             getMapId());
    }

  } catch (IOException ioe) {      
    // Close the streams
    IOUtils.cleanup(LOG, input);

    // Re-throw
    throw ioe;
  } finally {
    CodecPool.returnDecompressor(decompressor);
  }
}
项目:hadoop-on-lustre2    文件:InMemoryLinkMapOutput.java   
@Override
public void shuffle(MapHost host, InputStream input, long compressedLength,
        long decompressedLength, ShuffleClientMetrics metrics,
        Reporter reporter) throws IOException {

       String mapHostName = host.getHostName().split(":")[0];
       String app_path = conf.get(MRConfig.LOCAL_DIR);
       LOG.debug("original app_path " + app_path);
       String[] app_path_parts = app_path.split("/");
       app_path_parts[app_path_parts.length-5] = mapHostName;
       StringBuilder builder = new StringBuilder();
       for(String s : app_path_parts) {
         builder.append(s);
         builder.append("/");
       }
       app_path = builder.toString();
       String src = app_path +  "output/" + getMapId() + "/file.out";


    File f = new File(src);
    if (f.exists()) {
        LOG.debug("shuffleToLink: the src " + src + " EXIST!");
    }

    //LOG.debug("src file size: "+f.length());

    //input = new FileInputStream(src);
       //input.skip(offset);

       RandomAccessFile raf = new RandomAccessFile(f, "r");
       input = Channels.newInputStream(raf.getChannel().position(offset));

    IFileInputStream checksumIn = new IFileInputStream(input,
            compressedLength, conf);

    input = checksumIn;

    // Are map-outputs compressed?
    if (codec != null) {
        decompressor.reset();
        input = codec.createInputStream(input, decompressor);
    }

    try {
        LOG.debug("offset: " + offset);
        LOG.debug("memory.length: " + memory.length);
        LOG.debug("compressedLength: " + compressedLength);
        LOG.debug("decompressedLength: " + decompressedLength);

        // TO-DO: would offset and length be OK to be int?
        IOUtils.readFully(input, memory, 0, memory.length);
        metrics.inputBytes((int) memory.length);
        reporter.progress();
        LOG.info("Read " + memory.length + " bytes from map-output for "
                + getMapId());

        /**
         * We've gotten the amount of data we were expecting. Verify the
         * decompressor has nothing more to offer. This action also forces
         * the decompressor to read any trailing bytes that weren't critical
         * for decompression, which is necessary to keep the stream in sync.
         */
        //if (input.read() >= 0) {
        //  throw new IOException(
        //          "Unexpected extra bytes from input stream for "
        //                  + getMapId());
        //}
           input.close();
           raf.close();
    } catch (IOException ioe) {
        // Close the streams
        IOUtils.cleanup(LOG, input);

        // Re-throw
        throw ioe;
    } finally {
        CodecPool.returnDecompressor(decompressor);
    }
}