@SuppressWarnings("unchecked") public RawKeyValueIterator finish() throws Throwable { // merge config params Class<K> keyClass = (Class<K>) jobConf.getMapOutputKeyClass(); Class<V> valueClass = (Class<V>) jobConf.getMapOutputValueClass(); final RawComparator<K> comparator = (RawComparator<K>) jobConf.getOutputKeyComparator(); // Wait for on-going merges to complete merger.close(); LOG.info("finalMerge called with " + segmentsToBeMerged.size() + " on-disk map-outputs"); List<Segment<K, V>> segments = new ArrayList<Segment<K, V>>(); long onDiskBytes = 0; for (Segment<K, V> segment : segmentsToBeMerged) { long fileLength = segment.getLength(); onDiskBytes += fileLength; LOG.debug("Disk file: " + segment + " Length is " + fileLength); segments.add(segment); } segmentsToBeMerged.clear(); LOG.info("Merging " + segmentsToBeMerged.size() + " files, " + onDiskBytes + " bytes from disk"); Collections.sort(segments, new Comparator<Segment<K, V>>() { public int compare(Segment<K, V> o1, Segment<K, V> o2) { if (o1.getLength() == o2.getLength()) { return 0; } return o1.getLength() < o2.getLength() ? -1 : 1; } }); return Merger.merge(jobConf, lustrefs, keyClass, valueClass, segments, segments.size(), mergeTempDir, comparator, reporter, spilledRecordsCounter, null, null); }
@Override public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException { if (inputs == null || inputs.size() == 0) { return; } TaskAttemptID dummyMapId = inputs.get(0).getMapId(); List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>(); long mergeOutputSize = createInMemorySegments(inputs, inMemorySegments, 0); int noInMemorySegments = inMemorySegments.size(); InMemoryMapOutput<K, V> mergedMapOutputs = unconditionalReserve(dummyMapId, mergeOutputSize, false); Writer<K, V> writer = new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream()); LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments + " segments of total-size: " + mergeOutputSize); RawKeyValueIterator rIter = Merger.merge(jobConf, rfs, (Class<K>)jobConf.getMapOutputKeyClass(), (Class<V>)jobConf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), new Path(reduceId.toString()), (RawComparator<K>)jobConf.getOutputKeyComparator(), reporter, null, null, null); Merger.writeFile(rIter, writer, reporter, jobConf); writer.close(); LOG.info(reduceId + " Memory-to-Memory merge of the " + noInMemorySegments + " files in-memory complete."); // Note the output of the merge closeInMemoryMergedFile(mergedMapOutputs); }
@SuppressWarnings( { "deprecation", "unchecked" }) public void testMergeShouldReturnProperProgress( List<Segment<Text, Text>> segments) throws IOException { Path tmpDir = new Path("localpath"); Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass(); Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass(); RawComparator<Text> comparator = jobConf.getOutputKeyComparator(); Counter readsCounter = new Counter(); Counter writesCounter = new Counter(); Progress mergePhase = new Progress(); RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass, valueClass, segments, 2, tmpDir, comparator, getReporter(), readsCounter, writesCounter, mergePhase); Assert.assertEquals(1.0f, mergeQueue.getProgress().get()); }
@Override public void merge(List<MapOutput<K, V>> inputs) throws IOException { if (inputs == null || inputs.size() == 0) { return; } TaskAttemptID dummyMapId = inputs.get(0).getMapId(); List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>(); long mergeOutputSize = createInMemorySegments(inputs, inMemorySegments, 0); int noInMemorySegments = inMemorySegments.size(); MapOutput<K, V> mergedMapOutputs = unconditionalReserve(dummyMapId, mergeOutputSize, false); Writer<K, V> writer = new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream()); LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments + " segments of total-size: " + mergeOutputSize); RawKeyValueIterator rIter = Merger.merge(jobConf, rfs, (Class<K>)jobConf.getMapOutputKeyClass(), (Class<V>)jobConf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), new Path(reduceId.toString()), (RawComparator<K>)jobConf.getOutputKeyComparator(), reporter, null, null, null); Merger.writeFile(rIter, writer, reporter, jobConf); writer.close(); LOG.info(reduceId + " Memory-to-Memory merge of the " + noInMemorySegments + " files in-memory complete."); // Note the output of the merge closeInMemoryMergedFile(mergedMapOutputs); }
@SuppressWarnings("unchecked") @Override public void merge(List<Segment<K,V>> segments) throws IOException { // sanity check if (segments == null || segments.isEmpty()) { LOG.info("No ondisk files to merge..."); return; } Class<K> keyClass = (Class<K>) jobConf.getMapOutputKeyClass(); Class<V> valueClass = (Class<V>) jobConf.getMapOutputValueClass(); final RawComparator<K> comparator = (RawComparator<K>) jobConf.getOutputKeyComparator(); long approxOutputSize = 0; int bytesPerSum = jobConf.getInt("io.bytes.per.checksum", 512); LOG.info("OnDiskMerger: We have " + segments.size() + " map outputs on disk. Triggering merge..."); // 1. Prepare the list of files to be merged. for (Segment<K,V> segment : segments) { approxOutputSize += segment.getLength(); } // add the checksum length approxOutputSize += ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum); // 2. Start the on-disk merge process Path outputPath = new Path(reduceDir, "file-" + (numPasses++)).suffix(Task.MERGED_OUTPUT_PREFIX); Writer<K, V> writer = new Writer<K, V>(jobConf, lustrefs.create(outputPath), (Class<K>) jobConf.getMapOutputKeyClass(), (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true); RawKeyValueIterator iter = null; try { iter = Merger.merge(jobConf, lustrefs, keyClass, valueClass, segments, ioSortFactor, mergeTempDir, comparator, reporter, spilledRecordsCounter, mergedMapOutputsCounter, null); Merger.writeFile(iter, writer, reporter, jobConf); writer.close(); } catch (IOException e) { lustrefs.delete(outputPath, true); throw e; } addSegmentToMerge(new Segment<K, V>(jobConf, lustrefs, outputPath, codec, false, null)); LOG.info(reduceId + " Finished merging " + segments.size() + " map output files on disk of total-size " + approxOutputSize + "." + " Local output file is " + outputPath + " of size " + lustrefs.getFileStatus(outputPath).getLen()); }
@Override public void merge(List<CompressAwarePath> inputs) throws IOException { // sanity check if (inputs == null || inputs.isEmpty()) { LOG.info("No ondisk files to merge..."); return; } long approxOutputSize = 0; int bytesPerSum = jobConf.getInt("io.bytes.per.checksum", 512); LOG.info("OnDiskMerger: We have " + inputs.size() + " map outputs on disk. Triggering merge..."); // 1. Prepare the list of files to be merged. for (CompressAwarePath file : inputs) { approxOutputSize += localFS.getFileStatus(file).getLen(); } // add the checksum length approxOutputSize += ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum); // 2. Start the on-disk merge process Path outputPath = localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX); FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath)); Writer<K, V> writer = new Writer<K, V>(jobConf, out, (Class<K>) jobConf.getMapOutputKeyClass(), (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true); RawKeyValueIterator iter = null; CompressAwarePath compressAwarePath; Path tmpDir = new Path(reduceId.toString()); try { iter = Merger.merge(jobConf, rfs, (Class<K>) jobConf.getMapOutputKeyClass(), (Class<V>) jobConf.getMapOutputValueClass(), codec, inputs.toArray(new Path[inputs.size()]), true, ioSortFactor, tmpDir, (RawComparator<K>) jobConf.getOutputKeyComparator(), reporter, spilledRecordsCounter, null, mergedMapOutputsCounter, null); Merger.writeFile(iter, writer, reporter, jobConf); writer.close(); compressAwarePath = new CompressAwarePath(outputPath, writer.getRawLength(), writer.getCompressedLength()); } catch (IOException e) { localFS.delete(outputPath, true); throw e; } closeOnDiskFile(compressAwarePath); LOG.info(reduceId + " Finished merging " + inputs.size() + " map output files on disk of total-size " + approxOutputSize + "." + " Local output file is " + outputPath + " of size " + localFS.getFileStatus(outputPath).getLen()); }
@SuppressWarnings( { "unchecked" }) public void testMergeShouldReturnProperProgress( List<Segment<Text, Text>> segments) throws IOException { Path tmpDir = new Path("localpath"); Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass(); Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass(); RawComparator<Text> comparator = jobConf.getOutputKeyComparator(); Counter readsCounter = new Counter(); Counter writesCounter = new Counter(); Progress mergePhase = new Progress(); RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass, valueClass, segments, 2, tmpDir, comparator, getReporter(), readsCounter, writesCounter, mergePhase); final float epsilon = 0.00001f; // Reading 6 keys total, 3 each in 2 segments, so each key read moves the // progress forward 1/6th of the way. Initially the first keys from each // segment have been read as part of the merge setup, so progress = 2/6. Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon); // The first next() returns one of the keys already read during merge setup Assert.assertTrue(mergeQueue.next()); Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon); // Subsequent next() calls should read one key and move progress Assert.assertTrue(mergeQueue.next()); Assert.assertEquals(3/6.0f, mergeQueue.getProgress().get(), epsilon); Assert.assertTrue(mergeQueue.next()); Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon); // At this point we've exhausted all of the keys in one segment // so getting the next key will return the already cached key from the // other segment Assert.assertTrue(mergeQueue.next()); Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon); // Subsequent next() calls should read one key and move progress Assert.assertTrue(mergeQueue.next()); Assert.assertEquals(5/6.0f, mergeQueue.getProgress().get(), epsilon); Assert.assertTrue(mergeQueue.next()); Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon); // Now there should be no more input Assert.assertFalse(mergeQueue.next()); Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon); Assert.assertTrue(mergeQueue.getKey() == null); Assert.assertEquals(0, mergeQueue.getValue().getData().length); }
@Override public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException { if (inputs == null || inputs.size() == 0) { return; } //name this output file same as the name of the first file that is //there in the current list of inmem files (this is guaranteed to //be absent on the disk currently. So we don't overwrite a prev. //created spill). Also we need to create the output file now since //it is not guaranteed that this file will be present after merge //is called (we delete empty files as soon as we see them //in the merge method) //figure out the mapId TaskAttemptID mapId = inputs.get(0).getMapId(); TaskID mapTaskId = mapId.getTaskID(); List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>(); long mergeOutputSize = createInMemorySegments(inputs, inMemorySegments,0); int noInMemorySegments = inMemorySegments.size(); Path outputPath = mapOutputFile.getInputFileForWrite(mapTaskId, mergeOutputSize).suffix( Task.MERGED_OUTPUT_PREFIX); FSDataOutputStream out = CryptoUtils.wrapIfNecessary(jobConf, rfs.create(outputPath)); Writer<K, V> writer = new Writer<K, V>(jobConf, out, (Class<K>) jobConf.getMapOutputKeyClass(), (Class<V>) jobConf.getMapOutputValueClass(), codec, null, true); RawKeyValueIterator rIter = null; CompressAwarePath compressAwarePath; try { LOG.info("Initiating in-memory merge with " + noInMemorySegments + " segments..."); rIter = Merger.merge(jobConf, rfs, (Class<K>)jobConf.getMapOutputKeyClass(), (Class<V>)jobConf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), new Path(reduceId.toString()), (RawComparator<K>)jobConf.getOutputKeyComparator(), reporter, spilledRecordsCounter, null, null); if (null == combinerClass) { Merger.writeFile(rIter, writer, reporter, jobConf); } else { combineCollector.setWriter(writer); combineAndSpill(rIter, reduceCombineInputCounter); } writer.close(); compressAwarePath = new CompressAwarePath(outputPath, writer.getRawLength(), writer.getCompressedLength()); LOG.info(reduceId + " Merge of the " + noInMemorySegments + " files in-memory complete." + " Local file is " + outputPath + " of size " + localFS.getFileStatus(outputPath).getLen()); } catch (IOException e) { //make sure that we delete the ondisk file that we created //earlier when we invoked cloneFileAttributes localFS.delete(outputPath, true); throw e; } // Note the output of the merge closeOnDiskFile(compressAwarePath); }
@Override public void merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException { if (inputs == null || inputs.size() == 0) { return; } //name this output file same as the name of the first file that is //there in the current list of inmem files (this is guaranteed to //be absent on the disk currently. So we don't overwrite a prev. //created spill). Also we need to create the output file now since //it is not guaranteed that this file will be present after merge //is called (we delete empty files as soon as we see them //in the merge method) //figure out the mapId TaskAttemptID mapId = inputs.get(0).getMapId(); TaskID mapTaskId = mapId.getTaskID(); List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>(); long mergeOutputSize = createInMemorySegments(inputs, inMemorySegments,0); int noInMemorySegments = inMemorySegments.size(); Path outputPath = mapOutputFile.getInputFileForWrite(mapTaskId, mergeOutputSize).suffix( Task.MERGED_OUTPUT_PREFIX); Writer<K,V> writer = new Writer<K,V>(jobConf, rfs, outputPath, (Class<K>) jobConf.getMapOutputKeyClass(), (Class<V>) jobConf.getMapOutputValueClass(), codec, null); RawKeyValueIterator rIter = null; CompressAwarePath compressAwarePath; try { LOG.info("Initiating in-memory merge with " + noInMemorySegments + " segments..."); rIter = Merger.merge(jobConf, rfs, (Class<K>)jobConf.getMapOutputKeyClass(), (Class<V>)jobConf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), new Path(reduceId.toString()), (RawComparator<K>)jobConf.getOutputKeyComparator(), reporter, spilledRecordsCounter, null, null); if (null == combinerClass) { Merger.writeFile(rIter, writer, reporter, jobConf); } else { combineCollector.setWriter(writer); combineAndSpill(rIter, reduceCombineInputCounter); } writer.close(); compressAwarePath = new CompressAwarePath(outputPath, writer.getRawLength(), writer.getCompressedLength()); LOG.info(reduceId + " Merge of the " + noInMemorySegments + " files in-memory complete." + " Local file is " + outputPath + " of size " + localFS.getFileStatus(outputPath).getLen()); } catch (IOException e) { //make sure that we delete the ondisk file that we created //earlier when we invoked cloneFileAttributes localFS.delete(outputPath, true); throw e; } // Note the output of the merge closeOnDiskFile(compressAwarePath); }
@Override public void merge(List<CompressAwarePath> inputs) throws IOException { // sanity check if (inputs == null || inputs.isEmpty()) { LOG.info("No ondisk files to merge..."); return; } long approxOutputSize = 0; int bytesPerSum = jobConf.getInt("io.bytes.per.checksum", 512); LOG.info("OnDiskMerger: We have " + inputs.size() + " map outputs on disk. Triggering merge..."); // 1. Prepare the list of files to be merged. for (CompressAwarePath file : inputs) { approxOutputSize += localFS.getFileStatus(file).getLen(); } // add the checksum length approxOutputSize += ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum); // 2. Start the on-disk merge process Path outputPath = localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(), approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX); Writer<K,V> writer = new Writer<K,V>(jobConf, rfs, outputPath, (Class<K>) jobConf.getMapOutputKeyClass(), (Class<V>) jobConf.getMapOutputValueClass(), codec, null); RawKeyValueIterator iter = null; CompressAwarePath compressAwarePath; Path tmpDir = new Path(reduceId.toString()); try { iter = Merger.merge(jobConf, rfs, (Class<K>) jobConf.getMapOutputKeyClass(), (Class<V>) jobConf.getMapOutputValueClass(), codec, inputs.toArray(new Path[inputs.size()]), true, ioSortFactor, tmpDir, (RawComparator<K>) jobConf.getOutputKeyComparator(), reporter, spilledRecordsCounter, null, mergedMapOutputsCounter, null); Merger.writeFile(iter, writer, reporter, jobConf); writer.close(); compressAwarePath = new CompressAwarePath(outputPath, writer.getRawLength(), writer.getCompressedLength()); } catch (IOException e) { localFS.delete(outputPath, true); throw e; } closeOnDiskFile(compressAwarePath); LOG.info(reduceId + " Finished merging " + inputs.size() + " map output files on disk of total-size " + approxOutputSize + "." + " Local output file is " + outputPath + " of size " + localFS.getFileStatus(outputPath).getLen()); }
@SuppressWarnings( { "unchecked" }) public void testMergeShouldReturnProperProgress( List<Segment<Text, Text>> segments) throws IOException { Path tmpDir = new Path("localpath"); Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass(); Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass(); RawComparator<Text> comparator = jobConf.getOutputKeyComparator(); Counter readsCounter = new Counter(); Counter writesCounter = new Counter(); Progress mergePhase = new Progress(); RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass, valueClass, segments, 2, tmpDir, comparator, getReporter(), readsCounter, writesCounter, mergePhase); final float epsilon = 0.00001f; // Reading 6 keys total, 3 each in 2 segments, so each key read moves the // progress forward 1/6th of the way. Initially the first keys from each // segment have been read as part of the merge setup, so progress = 2/6. Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon); // The first next() returns one of the keys already read during merge setup Assert.assertTrue(mergeQueue.next()); Assert.assertEquals(2/6.0f, mergeQueue.getProgress().get(), epsilon); // Subsequent next() calls should read one key and move progress Assert.assertTrue(mergeQueue.next()); Assert.assertEquals(3/6.0f, mergeQueue.getProgress().get(), epsilon); Assert.assertTrue(mergeQueue.next()); Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon); // At this point we've exhausted all of the keys in one segment // so getting the next key will return the already cached key from the // other segment Assert.assertTrue(mergeQueue.next()); Assert.assertEquals(4/6.0f, mergeQueue.getProgress().get(), epsilon); // Subsequent next() calls should read one key and move progress Assert.assertTrue(mergeQueue.next()); Assert.assertEquals(5/6.0f, mergeQueue.getProgress().get(), epsilon); Assert.assertTrue(mergeQueue.next()); Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon); // Now there should be no more input Assert.assertFalse(mergeQueue.next()); Assert.assertEquals(1.0f, mergeQueue.getProgress().get(), epsilon); }