@SuppressWarnings( { "deprecation", "unchecked" }) public void testMergeShouldReturnProperProgress( List<Segment<Text, Text>> segments) throws IOException { Configuration conf = new Configuration(); JobConf jobConf = new JobConf(); FileSystem fs = FileSystem.getLocal(conf); Path tmpDir = new Path("localpath"); Class<Text> keyClass = (Class<Text>) jobConf.getMapOutputKeyClass(); Class<Text> valueClass = (Class<Text>) jobConf.getMapOutputValueClass(); RawComparator<Text> comparator = jobConf.getOutputKeyComparator(); Counter readsCounter = new Counter(); Counter writesCounter = new Counter(); RawKeyValueIterator mergeQueue = Merger.merge(conf, fs, keyClass, valueClass, segments, 2, tmpDir, comparator, getReporter(), readsCounter, writesCounter); Assert.assertEquals(1.0f, mergeQueue.getProgress().get()); }
public synchronized void flush() throws IOException, ClassNotFoundException, InterruptedException { if (numSpills > 0 && lastSpillInMem) { // if there is already one spills, we can try to hold this last spill in // memory. sortReduceParts(); for (int i = 0; i < partitions; i++) { this.inMemorySegments[i] = new Segment<K, V>(this.reducePartitions[i].getIReader(), true); } hasInMemorySpill=true; } else { sortAndSpill(); } long mergeStartMilli = System.currentTimeMillis(); ProcResourceValues mergeStartProcVals = task.getCurrentProcResourceValues(); long mergeStart = task.jmxThreadInfoTracker.getTaskCPUTime("MAIN_TASK"); mergeParts(); long mergeEndMilli = System.currentTimeMillis(); ProcResourceValues mergeEndProcVals = task.getCurrentProcResourceValues(); long mergeEnd = task.jmxThreadInfoTracker.getTaskCPUTime("MAIN_TASK"); mapSpillSortCounter.incMergeCounters(mergeStartProcVals, mergeEndProcVals, mergeEndMilli - mergeStartMilli); mapSpillSortCounter.incJVMCPUMerge(mergeStart, mergeEnd); }
public synchronized void flush() throws IOException, ClassNotFoundException, InterruptedException { if (numSpills > 0 && lastSpillInMem) { // if there is already one spills, we can try to hold this last spill in // memory. sortReduceParts(); for (int i = 0; i < partitions; i++) { this.inMemorySegments[i] = new Segment<K, V>(this.reducePartitions[i].getIReader(), true); } hasInMemorySpill=true; } else { sortAndSpill(); } long mergeStartMilli = System.currentTimeMillis(); ProcResourceValues mergeStartProcVals = task.getCurrentProcResourceValues(); mergeParts(); long mergeEndMilli = System.currentTimeMillis(); ProcResourceValues mergeEndProcVals = task.getCurrentProcResourceValues(); mapSpillSortCounter.incMergeCounters(mergeStartProcVals, mergeEndProcVals, mergeEndMilli - mergeStartMilli); }
private int createInMemSegments( List<Segment<K, V>> inMemorySegments, int num) throws IOException { int totalSize = 0; synchronized (mapOutputsFilesInMemory) { //LOG.info("create in mem 111"); while(num > 0) { MapOutput mo = mapOutputsFilesInMemory.remove(0); totalSize += mo.data.getLen(); Reader<K, V> reader = new Reader(conf, ramManager, mo.data, mo.data.getLen()); Segment<K, V> segment = new Segment<K, V>(reader, true); inMemorySegments.add(segment); num--; } //LOG.info("create in mem 222"); } return totalSize; }
@Override public int compare(Segment<K, V> seg1, Segment<K, V> seg2) { if(seg1.getLength() < seg2.getLength()) { return -1; } else if(seg1.getLength() > seg2.getLength()) { return 1; } return SharedFsPlugins.getSegmentPath(seg1) .compareTo(SharedFsPlugins.getSegmentPath(seg2)); }
@SuppressWarnings("unchecked") public RawKeyValueIterator finish() throws Throwable { // merge config params Class<K> keyClass = (Class<K>) jobConf.getMapOutputKeyClass(); Class<V> valueClass = (Class<V>) jobConf.getMapOutputValueClass(); final RawComparator<K> comparator = (RawComparator<K>) jobConf.getOutputKeyComparator(); // Wait for on-going merges to complete merger.close(); LOG.info("finalMerge called with " + segmentsToBeMerged.size() + " on-disk map-outputs"); List<Segment<K, V>> segments = new ArrayList<Segment<K, V>>(); long onDiskBytes = 0; for (Segment<K, V> segment : segmentsToBeMerged) { long fileLength = segment.getLength(); onDiskBytes += fileLength; LOG.debug("Disk file: " + segment + " Length is " + fileLength); segments.add(segment); } segmentsToBeMerged.clear(); LOG.info("Merging " + segmentsToBeMerged.size() + " files, " + onDiskBytes + " bytes from disk"); Collections.sort(segments, new Comparator<Segment<K, V>>() { public int compare(Segment<K, V> o1, Segment<K, V> o2) { if (o1.getLength() == o2.getLength()) { return 0; } return o1.getLength() < o2.getLength() ? -1 : 1; } }); return Merger.merge(jobConf, lustrefs, keyClass, valueClass, segments, segments.size(), mergeTempDir, comparator, reporter, spilledRecordsCounter, null, null); }
public void mark() throws IOException { // We read one KV pair in advance in hasNext. // If hasNext has read the next KV pair from a new segment, but the // user has not called next() for that KV, then reset the readSegmentIndex // to the previous segment if (nextKVOffset == 0) { assert (readSegmentIndex != 0); assert (currentKVOffset != 0); readSegmentIndex --; } // just drop segments before the current active segment int i = 0; Iterator<Segment<K,V>> itr = segmentList.iterator(); while (itr.hasNext()) { Segment<K,V> s = itr.next(); if (i == readSegmentIndex) { break; } s.close(); itr.remove(); i++; LOG.debug("Dropping a segment"); } // FirstSegmentOffset is the offset in the current segment from where we // need to start reading on the next reset firstSegmentOffset = currentKVOffset; readSegmentIndex = 0; LOG.debug("Setting the FirsSegmentOffset to " + currentKVOffset); }
public void reset() throws IOException { // Create a new segment for the previously written records only if we // are not already in the reset mode if (!inReset) { if (fileCache.isActive) { fileCache.createInDiskSegment(); } else { memCache.createInMemorySegment(); } } inReset = true; // Reset the segments to the correct position from where the next read // should begin. for (int i = 0; i < segmentList.size(); i++) { Segment<K,V> s = segmentList.get(i); if (s.inMemory()) { int offset = (i == 0) ? firstSegmentOffset : 0; s.getReader().reset(offset); } else { s.closeReader(); if (i == 0) { s.reinitReader(firstSegmentOffset); s.getReader().disableChecksumValidation(); } } } currentKVOffset = firstSegmentOffset; nextKVOffset = -1; readSegmentIndex = 0; hasMore = false; lastSegmentEOF = false; LOG.debug("Reset - First segment offset is " + firstSegmentOffset + " Segment List Size is " + segmentList.size()); }
private void clearSegmentList() throws IOException { for (Segment<K,V> segment: segmentList) { long len = segment.getLength(); segment.close(); if (segment.inMemory()) { memCache.unreserve(len); } } segmentList.clear(); }
/** * This method creates a memory segment from the existing buffer * @throws IOException */ void createInMemorySegment () throws IOException { // If nothing was written in this block because the record size // was greater than the allocated block size, just return. if (usedSize == 0) { ramManager.unreserve(blockSize); return; } // spaceAvailable would have ensured that there is enough space // left for the EOF markers. assert ((blockSize - usedSize) >= EOF_MARKER_SIZE); WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER); WritableUtils.writeVInt(dataOut, IFile.EOF_MARKER); usedSize += EOF_MARKER_SIZE; ramManager.unreserve(blockSize - usedSize); Reader<K, V> reader = new org.apache.hadoop.mapreduce.task.reduce.InMemoryReader<K, V>(null, (org.apache.hadoop.mapred.TaskAttemptID) tid, dataOut.getData(), 0, usedSize, conf); Segment<K, V> segment = new Segment<K, V>(reader, false); segmentList.add(segment); LOG.debug("Added Memory Segment to List. List Size is " + segmentList.size()); }
void createInDiskSegment() throws IOException { assert (writer != null); writer.close(); Segment<K,V> s = new Segment<K, V>(conf, fs, file, null, true); writer = null; segmentList.add(s); LOG.debug("Disk Segment added to List. Size is " + segmentList.size()); }
@Override public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException { if (inputs == null || inputs.size() == 0) { return; } TaskAttemptID dummyMapId = inputs.get(0).getMapId(); List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>(); long mergeOutputSize = createInMemorySegments(inputs, inMemorySegments, 0); int noInMemorySegments = inMemorySegments.size(); InMemoryMapOutput<K, V> mergedMapOutputs = unconditionalReserve(dummyMapId, mergeOutputSize, false); Writer<K, V> writer = new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream()); LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments + " segments of total-size: " + mergeOutputSize); RawKeyValueIterator rIter = Merger.merge(jobConf, rfs, (Class<K>)jobConf.getMapOutputKeyClass(), (Class<V>)jobConf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), new Path(reduceId.toString()), (RawComparator<K>)jobConf.getOutputKeyComparator(), reporter, null, null, null); Merger.writeFile(rIter, writer, reporter, jobConf); writer.close(); LOG.info(reduceId + " Memory-to-Memory merge of the " + noInMemorySegments + " files in-memory complete."); // Note the output of the merge closeInMemoryMergedFile(mergedMapOutputs); }
private List<Segment<Text, Text>> getUncompressedSegments() throws IOException { List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>(); for (int i = 0; i < 2; i++) { segments.add(getUncompressedSegment(i)); } return segments; }
private List<Segment<Text, Text>> getCompressedSegments() throws IOException { List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>(); for (int i = 0; i < 2; i++) { segments.add(getCompressedSegment(i)); } return segments; }
private List<Segment<Text, Text>> getUncompressedSegments() throws IOException { List<Segment<Text, Text>> segments = new ArrayList<Segment<Text, Text>>(); for (int i = 1; i < 10; i++) { segments.add(getUncompressedSegment(i)); } return segments; }