private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path, List<String> keys, List<String> values) throws IOException { FSDataInputStream in = CryptoUtils.wrapIfNecessary(conf, fs.open(path)); IFile.Reader<Text, Text> reader = new IFile.Reader<Text, Text>(conf, in, fs.getFileStatus(path).getLen(), null, null); DataInputBuffer keyBuff = new DataInputBuffer(); DataInputBuffer valueBuff = new DataInputBuffer(); Text key = new Text(); Text value = new Text(); while (reader.nextRawKey(keyBuff)) { key.readFields(keyBuff); keys.add(key.toString()); reader.nextRawValue(valueBuff); value.readFields(valueBuff); values.add(value.toString()); } }
void init(Counters.Counter readsCounter) throws IOException { if (reader == null) { FSDataInputStream in = fs.open(file); in.seek(segmentOffset); in = CryptoUtils.wrapIfNecessary(conf, in); reader = new Reader<K, V>(conf, in, segmentLength - CryptoUtils.cryptoPadding(conf), codec, readsCounter); } if (mapOutputsCounter != null) { mapOutputsCounter.increment(1); } }
private Writer<K,V> createSpillFile() throws IOException { Path tmp = new Path(MRJobConfig.OUTPUT + "/backup_" + tid.getId() + "_" + (spillNumber++) + ".out"); LOG.info("Created file: " + tmp); file = lDirAlloc.getLocalPathForWrite(tmp.toUri().getPath(), -1, conf); FSDataOutputStream out = fs.create(file); out = CryptoUtils.wrapIfNecessary(conf, out); return new Writer<K, V>(conf, out, null, null, null, null, true); }
@VisibleForTesting OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, MergeManagerImpl<K,V> merger, long size, JobConf conf, MapOutputFile mapOutputFile, int fetcher, boolean primaryMapOutput, FileSystem fs, Path outputPath) throws IOException { super(mapId, size, primaryMapOutput); this.fs = fs; this.merger = merger; this.outputPath = outputPath; tmpOutputPath = getTempPath(outputPath, fetcher); disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath)); this.conf = conf; }
/** * Retrieve the map output of a single map task * and send it to the merger. */ private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException { // Figure out where the map task stored its output. Path mapOutputFileName = localMapFiles.get(mapTaskId).getOutputFile(); Path indexFileName = mapOutputFileName.suffix(".index"); // Read its index to determine the location of our split // and its size. SpillRecord sr = new SpillRecord(indexFileName, job); IndexRecord ir = sr.getIndex(reduce); long compressedLength = ir.partLength; long decompressedLength = ir.rawLength; compressedLength -= CryptoUtils.cryptoPadding(job); decompressedLength -= CryptoUtils.cryptoPadding(job); // Get the location for the map output - either in-memory or on-disk MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength, id); // Check if we can shuffle *now* ... if (mapOutput == null) { LOG.info("fetcher#" + id + " - MergeManager returned Status.WAIT ..."); return false; } // Go! LOG.info("localfetcher#" + id + " about to shuffle output of map " + mapOutput.getMapId() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + mapOutput.getDescription()); // now read the file, seek to the appropriate section, and send it. FileSystem localFs = FileSystem.getLocal(job).getRaw(); FSDataInputStream inStream = localFs.open(mapOutputFileName); inStream = CryptoUtils.wrapIfNecessary(job, inStream); try { inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job)); mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter); } finally { try { inStream.close(); } catch (IOException ioe) { LOG.warn("IOException closing inputstream from map output: " + ioe.toString()); } } scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0, mapOutput); return true; // successful fetch. }
OnDiskMapOutput(TaskAttemptID mapId, MergeManagerImpl<K, V> merger, long size, JobConf conf, int fetcher, boolean primaryMapOutput, FileSystem fs, Path outputPath) throws IOException { super(conf, merger, mapId, size, primaryMapOutput); this.fs = fs; this.outputPath = outputPath; tmpOutputPath = getTempPath(outputPath, fetcher); disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath)); }
@VisibleForTesting OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, MergeManagerImpl<K,V> merger, long size, JobConf conf, MapOutputFile mapOutputFile, int fetcher, boolean primaryMapOutput, FileSystem fs, Path outputPath) throws IOException { super(mapId, size, primaryMapOutput); this.fs = fs; this.merger = merger; this.outputPath = outputPath; tmpOutputPath = getTempPath(outputPath, fetcher); disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath)); }
/** * Handles the degenerate case where serialization fails to fit in * the in-memory buffer, so we must spill the record from collect * directly to a spill file. Consider this "losing". */ private void spillSingleRecord(final K key, final V value, int partition) throws IOException { long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH; FSDataOutputStream out = null; try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, size); out = rfs.create(filename); // we don't run the combiner for a single record IndexRecord rec = new IndexRecord(); for (int i = 0; i < partitions; ++i) { IFile.Writer<K, V> writer = null; try { long segmentStart = out.getPos(); // Create a new codec, don't care! FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out); writer = new IFile.Writer<K,V>(job, partitionOut, keyClass, valClass, codec, spilledRecordsCounter); if (i == partition) { final long recordStart = out.getPos(); writer.append(key, value); // Note that our map byte count will not be accurate with // compression mapOutputByteCounter.increment(out.getPos() - recordStart); } writer.close(); // record offsets rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job); rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job); spillRec.putIndex(rec, i); writer = null; } catch (IOException e) { if (null != writer) writer.close(); throw e; } } if (totalIndexCacheMemory >= indexCacheMemoryLimit) { // create spill index file Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } ++numSpills; } finally { if (out != null) out.close(); } }
@Override public void merge(List<CompressAwarePath> inputs) throws IOException { // sanity check if (inputs == null || inputs.isEmpty()) { LOG.info("No ondisk files to merge..."); return; } long approxOutputSize = 0; int bytesPerSum = jobConf.getInt("io.bytes.per.checksum", 512); LOG.info("OnDiskMerger: We have " + inputs.size() + " map outputs on disk. Triggering merge..."); // 1. Prepare the list of files to be merged. for (CompressAwarePath file : inputs) { approxOutputSize += localFS.getFileStatus(file).getLen(); } // add the checksum length approxOutputSize += ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum); // 2. Start the on-disk merge process Path outputPath = localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX); FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath)); Writer<K, V> writer = new Writer<K, V>(jobConf, out, (Class<K>) jobConf.getMapOutputKeyClass(), (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true); RawKeyValueIterator iter = null; CompressAwarePath compressAwarePath; Path tmpDir = new Path(reduceId.toString()); try { iter = Merger.merge(jobConf, rfs, (Class<K>) jobConf.getMapOutputKeyClass(), (Class<V>) jobConf.getMapOutputValueClass(), codec, inputs.toArray(new Path[inputs.size()]), true, ioSortFactor, tmpDir, (RawComparator<K>) jobConf.getOutputKeyComparator(), reporter, spilledRecordsCounter, null, mergedMapOutputsCounter, null); Merger.writeFile(iter, writer, reporter, jobConf); writer.close(); compressAwarePath = new CompressAwarePath(outputPath, writer.getRawLength(), writer.getCompressedLength()); } catch (IOException e) { localFS.delete(outputPath, true); throw e; } closeOnDiskFile(compressAwarePath); LOG.info(reduceId + " Finished merging " + inputs.size() + " map output files on disk of total-size " + approxOutputSize + "." + " Local output file is " + outputPath + " of size " + localFS.getFileStatus(outputPath).getLen()); }
public Job(JobID jobid, String jobSubmitDir) throws IOException { this.systemJobDir = new Path(jobSubmitDir); this.systemJobFile = new Path(systemJobDir, "job.xml"); this.id = jobid; JobConf conf = new JobConf(systemJobFile); this.localFs = FileSystem.getLocal(conf); String user = UserGroupInformation.getCurrentUser().getShortUserName(); this.localJobDir = localFs.makeQualified(new Path( new Path(conf.getLocalPath(jobDir), user), jobid.toString())); this.localJobFile = new Path(this.localJobDir, id + ".xml"); // Manage the distributed cache. If there are files to be copied, // this will trigger localFile to be re-written again. localDistributedCacheManager = new LocalDistributedCacheManager(); localDistributedCacheManager.setup(conf); // Write out configuration file. Instead of copying it from // systemJobFile, we re-write it, since setup(), above, may have // updated it. OutputStream out = localFs.create(localJobFile); try { conf.writeXml(out); } finally { out.close(); } this.job = new JobConf(localJobFile); // Job (the current object) is a Thread, so we wrap its class loader. if (localDistributedCacheManager.hasLocalClasspaths()) { setContextClassLoader(localDistributedCacheManager.makeClassLoader( getContextClassLoader())); } profile = new JobProfile(job.getUser(), id, systemJobFile.toString(), "http://localhost:8080/", job.getJobName()); status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING, profile.getUser(), profile.getJobName(), profile.getJobFile(), profile.getURL().toString()); jobs.put(id, this); if (CryptoUtils.isEncryptedSpillEnabled(job)) { try { int keyLen = conf.getInt( MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, MRJobConfig .DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS); KeyGenerator keyGen = KeyGenerator.getInstance(INTERMEDIATE_DATA_ENCRYPTION_ALGO); keyGen.init(keyLen); Credentials creds = UserGroupInformation.getCurrentUser().getCredentials(); TokenCache.setEncryptedSpillKey(keyGen.generateKey().getEncoded(), creds); UserGroupInformation.getCurrentUser().addCredentials(creds); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating encrypted spill key", e); } } this.start(); }
/** * Retrieve the map output of a single map task * and send it to the merger. */ private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException { // Figure out where the map task stored its output. Path mapOutputFileName = localMapFiles.get(mapTaskId).getOutputFile(); Path indexFileName = mapOutputFileName.suffix(".index"); // Read its index to determine the location of our split // and its size. SpillRecord sr = new SpillRecord(indexFileName, job); IndexRecord ir = sr.getIndex(reduce); long compressedLength = ir.partLength; long decompressedLength = ir.rawLength; compressedLength -= CryptoUtils.cryptoPadding(job); decompressedLength -= CryptoUtils.cryptoPadding(job); // Get the location for the map output - either in-memory or on-disk MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength, id); // Check if we can shuffle *now* ... if (mapOutput == null) { LOG.info("fetcher#" + id + " - MergeManager returned Status.WAIT ..."); return false; } // Go! LOG.info("localfetcher#" + id + " about to shuffle output of map " + mapOutput.getMapId() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + mapOutput.getDescription()); // now read the file, seek to the appropriate section, and send it. FileSystem localFs = FileSystem.getLocal(job).getRaw(); FSDataInputStream inStream = localFs.open(mapOutputFileName); try { inStream = CryptoUtils.wrapIfNecessary(job, inStream); inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job)); mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter); } finally { IOUtils.cleanup(LOG, inStream); } scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0, mapOutput); return true; // successful fetch. }
@Override public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException { if (inputs == null || inputs.size() == 0) { return; } //name this output file same as the name of the first file that is //there in the current list of inmem files (this is guaranteed to //be absent on the disk currently. So we don't overwrite a prev. //created spill). Also we need to create the output file now since //it is not guaranteed that this file will be present after merge //is called (we delete empty files as soon as we see them //in the merge method) //figure out the mapId TaskAttemptID mapId = inputs.get(0).getMapId(); TaskID mapTaskId = mapId.getTaskID(); List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>(); long mergeOutputSize = createInMemorySegments(inputs, inMemorySegments,0); int noInMemorySegments = inMemorySegments.size(); Path outputPath = mapOutputFile.getInputFileForWrite(mapTaskId, mergeOutputSize).suffix( Task.MERGED_OUTPUT_PREFIX); FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath)); Writer<K, V> writer = new Writer<K, V>(jobConf, out, (Class<K>) jobConf.getMapOutputKeyClass(), (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true); RawKeyValueIterator rIter = null; CompressAwarePath compressAwarePath; try { LOG.info("Initiating in-memory merge with " + noInMemorySegments + " segments..."); rIter = Merger.merge(jobConf, rfs, (Class<K>)jobConf.getMapOutputKeyClass(), (Class<V>)jobConf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), new Path(reduceId.toString()), (RawComparator<K>)jobConf.getOutputKeyComparator(), reporter, spilledRecordsCounter, null, null); if (null == combinerClass) { Merger.writeFile(rIter, writer, reporter, jobConf); } else { combineCollector.setWriter(writer); combineAndSpill(rIter, reduceCombineInputCounter); } writer.close(); compressAwarePath = new CompressAwarePath(outputPath, writer.getRawLength(), writer.getCompressedLength()); LOG.info(reduceId + " Merge of the " + noInMemorySegments + " files in-memory complete." + " Local file is " + outputPath + " of size " + localFS.getFileStatus(outputPath).getLen()); } catch (IOException e) { //make sure that we delete the ondisk file that we created //earlier when we invoked cloneFileAttributes localFS.delete(outputPath, true); throw e; } // Note the output of the merge closeOnDiskFile(compressAwarePath); }
/** * Retrieve the map output of a single map task * and send it to the merger. */ private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException { // Figure out where the map task stored its output. Path mapOutputFileName = localMapFiles.get(mapTaskId).getOutputFile(); Path indexFileName = mapOutputFileName.suffix(".index"); // Read its index to determine the location of our split // and its size. SpillRecord sr = new SpillRecord(indexFileName, job); IndexRecord ir = sr.getIndex(reduce); long compressedLength = ir.partLength; long decompressedLength = ir.rawLength; // Get the location for the map output - either in-memory or on-disk MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength, id); // Check if we can shuffle *now* ... if (mapOutput == null) { LOG.info("fetcher#" + id + " - MergeManager returned Status.WAIT ..."); return false; } // Go! LOG.info("localfetcher#" + id + " about to shuffle output of map " + mapOutput.getMapId() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + mapOutput.getDescription()); // now read the file, seek to the appropriate section, and send it. FileSystem localFs = FileSystem.getLocal(job).getRaw(); FSDataInputStream inStream = localFs.open(mapOutputFileName); inStream = CryptoUtils.wrapIfNecessary(job, inStream); try { inStream.seek(ir.startOffset); mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter); } finally { try { inStream.close(); } catch (IOException ioe) { LOG.warn("IOException closing inputstream from map output: " + ioe.toString()); } } scheduler.copySucceeded(mapTaskId, LOCALHOST, compressedLength, 0, 0, mapOutput); return true; // successful fetch. }