Java 类org.apache.hadoop.mapred.SpillRecord 实例源码

项目:hadoop    文件:LocalFetcher.java   
/**
 * Retrieve the map output of a single map task
 * and send it to the merger.
 */
private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException {
  // Figure out where the map task stored its output.
  Path mapOutputFileName = localMapFiles.get(mapTaskId).getOutputFile();
  Path indexFileName = mapOutputFileName.suffix(".index");

  // Read its index to determine the location of our split
  // and its size.
  SpillRecord sr = new SpillRecord(indexFileName, job);
  IndexRecord ir = sr.getIndex(reduce);

  long compressedLength = ir.partLength;
  long decompressedLength = ir.rawLength;

  compressedLength -= CryptoUtils.cryptoPadding(job);
  decompressedLength -= CryptoUtils.cryptoPadding(job);

  // Get the location for the map output - either in-memory or on-disk
  MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength,
      id);

  // Check if we can shuffle *now* ...
  if (mapOutput == null) {
    LOG.info("fetcher#" + id + " - MergeManager returned Status.WAIT ...");
    return false;
  }

  // Go!
  LOG.info("localfetcher#" + id + " about to shuffle output of map " + 
           mapOutput.getMapId() + " decomp: " +
           decompressedLength + " len: " + compressedLength + " to " +
           mapOutput.getDescription());

  // now read the file, seek to the appropriate section, and send it.
  FileSystem localFs = FileSystem.getLocal(job).getRaw();
  FSDataInputStream inStream = localFs.open(mapOutputFileName);

  inStream = CryptoUtils.wrapIfNecessary(job, inStream);

  try {
    inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
    mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter);
  } finally {
    try {
      inStream.close();
    } catch (IOException ioe) {
      LOG.warn("IOException closing inputstream from map output: "
          + ioe.toString());
    }
  }

  scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0,
      mapOutput);
  return true; // successful fetch.
}
项目:aliyun-oss-hadoop-fs    文件:LocalFetcher.java   
/**
 * Retrieve the map output of a single map task
 * and send it to the merger.
 */
private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException {
  // Figure out where the map task stored its output.
  Path mapOutputFileName = localMapFiles.get(mapTaskId).getOutputFile();
  Path indexFileName = mapOutputFileName.suffix(".index");

  // Read its index to determine the location of our split
  // and its size.
  SpillRecord sr = new SpillRecord(indexFileName, job);
  IndexRecord ir = sr.getIndex(reduce);

  long compressedLength = ir.partLength;
  long decompressedLength = ir.rawLength;

  compressedLength -= CryptoUtils.cryptoPadding(job);
  decompressedLength -= CryptoUtils.cryptoPadding(job);

  // Get the location for the map output - either in-memory or on-disk
  MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength,
      id);

  // Check if we can shuffle *now* ...
  if (mapOutput == null) {
    LOG.info("fetcher#" + id + " - MergeManager returned Status.WAIT ...");
    return false;
  }

  // Go!
  LOG.info("localfetcher#" + id + " about to shuffle output of map " + 
           mapOutput.getMapId() + " decomp: " +
           decompressedLength + " len: " + compressedLength + " to " +
           mapOutput.getDescription());

  // now read the file, seek to the appropriate section, and send it.
  FileSystem localFs = FileSystem.getLocal(job).getRaw();
  FSDataInputStream inStream = localFs.open(mapOutputFileName);
  try {
    inStream = CryptoUtils.wrapIfNecessary(job, inStream);
    inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
    mapOutput.shuffle(LOCALHOST, inStream, compressedLength,
        decompressedLength, metrics, reporter);
  } finally {
    IOUtils.cleanup(LOG, inStream);
  }

  scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0,
      mapOutput);
  return true; // successful fetch.
}
项目:big-c    文件:LocalFetcher.java   
/**
 * Retrieve the map output of a single map task
 * and send it to the merger.
 */
private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException {
  // Figure out where the map task stored its output.
  Path mapOutputFileName = localMapFiles.get(mapTaskId).getOutputFile();
  Path indexFileName = mapOutputFileName.suffix(".index");

  // Read its index to determine the location of our split
  // and its size.
  SpillRecord sr = new SpillRecord(indexFileName, job);
  IndexRecord ir = sr.getIndex(reduce);

  long compressedLength = ir.partLength;
  long decompressedLength = ir.rawLength;

  compressedLength -= CryptoUtils.cryptoPadding(job);
  decompressedLength -= CryptoUtils.cryptoPadding(job);

  // Get the location for the map output - either in-memory or on-disk
  MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength,
      id);

  // Check if we can shuffle *now* ...
  if (mapOutput == null) {
    LOG.info("fetcher#" + id + " - MergeManager returned Status.WAIT ...");
    return false;
  }

  // Go!
  LOG.info("localfetcher#" + id + " about to shuffle output of map " + 
           mapOutput.getMapId() + " decomp: " +
           decompressedLength + " len: " + compressedLength + " to " +
           mapOutput.getDescription());

  // now read the file, seek to the appropriate section, and send it.
  FileSystem localFs = FileSystem.getLocal(job).getRaw();
  FSDataInputStream inStream = localFs.open(mapOutputFileName);

  inStream = CryptoUtils.wrapIfNecessary(job, inStream);

  try {
    inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
    mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter);
  } finally {
    try {
      inStream.close();
    } catch (IOException ioe) {
      LOG.warn("IOException closing inputstream from map output: "
          + ioe.toString());
    }
  }

  scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0,
      mapOutput);
  return true; // successful fetch.
}
项目:hadoop-2.6.0-cdh5.4.3    文件:LocalFetcher.java   
/**
 * Retrieve the map output of a single map task
 * and send it to the merger.
 */
private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException {
  // Figure out where the map task stored its output.
  Path mapOutputFileName = localMapFiles.get(mapTaskId).getOutputFile();
  Path indexFileName = mapOutputFileName.suffix(".index");

  // Read its index to determine the location of our split
  // and its size.
  SpillRecord sr = new SpillRecord(indexFileName, job);
  IndexRecord ir = sr.getIndex(reduce);

  long compressedLength = ir.partLength;
  long decompressedLength = ir.rawLength;

  // Get the location for the map output - either in-memory or on-disk
  MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength,
      id);

  // Check if we can shuffle *now* ...
  if (mapOutput == null) {
    LOG.info("fetcher#" + id + " - MergeManager returned Status.WAIT ...");
    return false;
  }

  // Go!
  LOG.info("localfetcher#" + id + " about to shuffle output of map " + 
           mapOutput.getMapId() + " decomp: " +
           decompressedLength + " len: " + compressedLength + " to " +
           mapOutput.getDescription());

  // now read the file, seek to the appropriate section, and send it.
  FileSystem localFs = FileSystem.getLocal(job).getRaw();
  FSDataInputStream inStream = localFs.open(mapOutputFileName);

  inStream = CryptoUtils.wrapIfNecessary(job, inStream);

  try {
    inStream.seek(ir.startOffset);

    mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter);
  } finally {
    try {
      inStream.close();
    } catch (IOException ioe) {
      LOG.warn("IOException closing inputstream from map output: "
          + ioe.toString());
    }
  }

  scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0,
      mapOutput);
  return true; // successful fetch.
}
项目:FlexMap    文件:LocalFetcher.java   
/**
 * Retrieve the map output of a single map task
 * and send it to the merger.
 */
private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException {
  // Figure out where the map task stored its output.
  Path mapOutputFileName = localMapFiles.get(mapTaskId).getOutputFile();
  Path indexFileName = mapOutputFileName.suffix(".index");

  // Read its index to determine the location of our split
  // and its size.
  SpillRecord sr = new SpillRecord(indexFileName, job);
  IndexRecord ir = sr.getIndex(reduce);

  long compressedLength = ir.partLength;
  long decompressedLength = ir.rawLength;

  // Get the location for the map output - either in-memory or on-disk
  MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength,
      id);

  // Check if we can shuffle *now* ...
  if (mapOutput == null) {
    LOG.info("fetcher#" + id + " - MergeManager returned Status.WAIT ...");
    return false;
  }

  // Go!
  LOG.info("localfetcher#" + id + " about to shuffle output of map " + 
           mapOutput.getMapId() + " decomp: " +
           decompressedLength + " len: " + compressedLength + " to " +
           mapOutput.getDescription());

  // now read the file, seek to the appropriate section, and send it.
  FileSystem localFs = FileSystem.getLocal(job).getRaw();
  FSDataInputStream inStream = localFs.open(mapOutputFileName);

  inStream = CryptoUtils.wrapIfNecessary(job, inStream);

  try {
    inStream.seek(ir.startOffset);

    mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter);
  } finally {
    try {
      inStream.close();
    } catch (IOException ioe) {
      LOG.warn("IOException closing inputstream from map output: "
          + ioe.toString());
    }
  }

  scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0,
      mapOutput);
  return true; // successful fetch.
}
项目:hops    文件:LocalFetcher.java   
/**
 * Retrieve the map output of a single map task
 * and send it to the merger.
 */
private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException {
  // Figure out where the map task stored its output.
  Path mapOutputFileName = localMapFiles.get(mapTaskId).getOutputFile();
  Path indexFileName = mapOutputFileName.suffix(".index");

  // Read its index to determine the location of our split
  // and its size.
  SpillRecord sr = new SpillRecord(indexFileName, job);
  IndexRecord ir = sr.getIndex(reduce);

  long compressedLength = ir.partLength;
  long decompressedLength = ir.rawLength;

  compressedLength -= CryptoUtils.cryptoPadding(job);
  decompressedLength -= CryptoUtils.cryptoPadding(job);

  // Get the location for the map output - either in-memory or on-disk
  MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength,
      id);

  // Check if we can shuffle *now* ...
  if (mapOutput == null) {
    LOG.info("fetcher#" + id + " - MergeManager returned Status.WAIT ...");
    return false;
  }

  // Go!
  LOG.info("localfetcher#" + id + " about to shuffle output of map " + 
           mapOutput.getMapId() + " decomp: " +
           decompressedLength + " len: " + compressedLength + " to " +
           mapOutput.getDescription());

  // now read the file, seek to the appropriate section, and send it.
  FileSystem localFs = FileSystem.getLocal(job).getRaw();
  FSDataInputStream inStream = localFs.open(mapOutputFileName);
  try {
    inStream = CryptoUtils.wrapIfNecessary(job, inStream);
    inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
    mapOutput.shuffle(LOCALHOST, inStream, compressedLength,
        decompressedLength, metrics, reporter);
  } finally {
    IOUtils.cleanup(LOG, inStream);
  }

  scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0,
      mapOutput);
  return true; // successful fetch.
}
项目:hadoop-on-lustre2    文件:LocalFetcher.java   
/**
 * Retrieve the map output of a single map task
 * and send it to the merger.
 */
private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException {
  // Figure out where the map task stored its output.
  Path mapOutputFileName = localMapFiles.get(mapTaskId).getOutputFile();
  Path indexFileName = mapOutputFileName.suffix(".index");

  // Read its index to determine the location of our split
  // and its size.
  SpillRecord sr = new SpillRecord(indexFileName, job);
  IndexRecord ir = sr.getIndex(reduce);

  long compressedLength = ir.partLength;
  long decompressedLength = ir.rawLength;

  // Get the location for the map output - either in-memory or on-disk
  MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength,
      id);

  // Check if we can shuffle *now* ...
  if (mapOutput == null) {
    LOG.info("fetcher#" + id + " - MergeManager returned Status.WAIT ...");
    return false;
  }

  // Go!
  LOG.info("localfetcher#" + id + " about to shuffle output of map " + 
           mapOutput.getMapId() + " decomp: " +
           decompressedLength + " len: " + compressedLength + " to " +
           mapOutput.getDescription());

  // now read the file, seek to the appropriate section, and send it.
  FileSystem localFs = FileSystem.getLocal(job).getRaw();
  FSDataInputStream inStream = localFs.open(mapOutputFileName);
  try {
    inStream.seek(ir.startOffset);

    mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter);
  } finally {
    try {
      inStream.close();
    } catch (IOException ioe) {
      LOG.warn("IOException closing inputstream from map output: "
          + ioe.toString());
    }
  }

  scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0,
      mapOutput);
  return true; // successful fetch.
}