/** * Retrieve the map output of a single map task * and send it to the merger. */ private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException { // Figure out where the map task stored its output. Path mapOutputFileName = localMapFiles.get(mapTaskId).getOutputFile(); Path indexFileName = mapOutputFileName.suffix(".index"); // Read its index to determine the location of our split // and its size. SpillRecord sr = new SpillRecord(indexFileName, job); IndexRecord ir = sr.getIndex(reduce); long compressedLength = ir.partLength; long decompressedLength = ir.rawLength; compressedLength -= CryptoUtils.cryptoPadding(job); decompressedLength -= CryptoUtils.cryptoPadding(job); // Get the location for the map output - either in-memory or on-disk MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength, id); // Check if we can shuffle *now* ... if (mapOutput == null) { LOG.info("fetcher#" + id + " - MergeManager returned Status.WAIT ..."); return false; } // Go! LOG.info("localfetcher#" + id + " about to shuffle output of map " + mapOutput.getMapId() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + mapOutput.getDescription()); // now read the file, seek to the appropriate section, and send it. FileSystem localFs = FileSystem.getLocal(job).getRaw(); FSDataInputStream inStream = localFs.open(mapOutputFileName); inStream = CryptoUtils.wrapIfNecessary(job, inStream); try { inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job)); mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter); } finally { try { inStream.close(); } catch (IOException ioe) { LOG.warn("IOException closing inputstream from map output: " + ioe.toString()); } } scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0, mapOutput); return true; // successful fetch. }
/** * Retrieve the map output of a single map task * and send it to the merger. */ private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException { // Figure out where the map task stored its output. Path mapOutputFileName = localMapFiles.get(mapTaskId).getOutputFile(); Path indexFileName = mapOutputFileName.suffix(".index"); // Read its index to determine the location of our split // and its size. SpillRecord sr = new SpillRecord(indexFileName, job); IndexRecord ir = sr.getIndex(reduce); long compressedLength = ir.partLength; long decompressedLength = ir.rawLength; compressedLength -= CryptoUtils.cryptoPadding(job); decompressedLength -= CryptoUtils.cryptoPadding(job); // Get the location for the map output - either in-memory or on-disk MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength, id); // Check if we can shuffle *now* ... if (mapOutput == null) { LOG.info("fetcher#" + id + " - MergeManager returned Status.WAIT ..."); return false; } // Go! LOG.info("localfetcher#" + id + " about to shuffle output of map " + mapOutput.getMapId() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + mapOutput.getDescription()); // now read the file, seek to the appropriate section, and send it. FileSystem localFs = FileSystem.getLocal(job).getRaw(); FSDataInputStream inStream = localFs.open(mapOutputFileName); try { inStream = CryptoUtils.wrapIfNecessary(job, inStream); inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job)); mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter); } finally { IOUtils.cleanup(LOG, inStream); } scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0, mapOutput); return true; // successful fetch. }
/** * Retrieve the map output of a single map task * and send it to the merger. */ private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException { // Figure out where the map task stored its output. Path mapOutputFileName = localMapFiles.get(mapTaskId).getOutputFile(); Path indexFileName = mapOutputFileName.suffix(".index"); // Read its index to determine the location of our split // and its size. SpillRecord sr = new SpillRecord(indexFileName, job); IndexRecord ir = sr.getIndex(reduce); long compressedLength = ir.partLength; long decompressedLength = ir.rawLength; // Get the location for the map output - either in-memory or on-disk MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength, id); // Check if we can shuffle *now* ... if (mapOutput == null) { LOG.info("fetcher#" + id + " - MergeManager returned Status.WAIT ..."); return false; } // Go! LOG.info("localfetcher#" + id + " about to shuffle output of map " + mapOutput.getMapId() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + mapOutput.getDescription()); // now read the file, seek to the appropriate section, and send it. FileSystem localFs = FileSystem.getLocal(job).getRaw(); FSDataInputStream inStream = localFs.open(mapOutputFileName); inStream = CryptoUtils.wrapIfNecessary(job, inStream); try { inStream.seek(ir.startOffset); mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter); } finally { try { inStream.close(); } catch (IOException ioe) { LOG.warn("IOException closing inputstream from map output: " + ioe.toString()); } } scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0, mapOutput); return true; // successful fetch. }
/** * Retrieve the map output of a single map task * and send it to the merger. */ private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException { // Figure out where the map task stored its output. Path mapOutputFileName = localMapFiles.get(mapTaskId).getOutputFile(); Path indexFileName = mapOutputFileName.suffix(".index"); // Read its index to determine the location of our split // and its size. SpillRecord sr = new SpillRecord(indexFileName, job); IndexRecord ir = sr.getIndex(reduce); long compressedLength = ir.partLength; long decompressedLength = ir.rawLength; // Get the location for the map output - either in-memory or on-disk MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength, id); // Check if we can shuffle *now* ... if (mapOutput == null) { LOG.info("fetcher#" + id + " - MergeManager returned Status.WAIT ..."); return false; } // Go! LOG.info("localfetcher#" + id + " about to shuffle output of map " + mapOutput.getMapId() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + mapOutput.getDescription()); // now read the file, seek to the appropriate section, and send it. FileSystem localFs = FileSystem.getLocal(job).getRaw(); FSDataInputStream inStream = localFs.open(mapOutputFileName); try { inStream.seek(ir.startOffset); mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter); } finally { try { inStream.close(); } catch (IOException ioe) { LOG.warn("IOException closing inputstream from map output: " + ioe.toString()); } } scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, mapOutput); return true; // successful fetch. }