public ReduceContextImpl(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriter<KEYOUT,VALUEOUT> output, OutputCommitter committer, StatusReporter reporter, RawComparator<KEYIN> comparator, Class<KEYIN> keyClass, Class<VALUEIN> valueClass ) throws InterruptedException, IOException{ super(conf, taskid, output, committer, reporter); this.input = input; this.inputKeyCounter = inputKeyCounter; this.inputValueCounter = inputValueCounter; this.comparator = comparator; this.serializationFactory = new SerializationFactory(conf); this.keyDeserializer = serializationFactory.getDeserializer(keyClass); this.keyDeserializer.open(buffer); this.valueDeserializer = serializationFactory.getDeserializer(valueClass); this.valueDeserializer.open(buffer); hasMore = input.next(); this.keyClass = keyClass; this.valueClass = valueClass; this.conf = conf; this.taskid = taskid; }
@Override public RawKeyValueIterator close() throws Throwable { // Wait for on-going merges to complete if (memToMemMerger != null) { memToMemMerger.close(); } inMemoryMerger.close(); onDiskMerger.close(); List<InMemoryMapOutput<K, V>> memory = new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs); inMemoryMergedMapOutputs.clear(); memory.addAll(inMemoryMapOutputs); inMemoryMapOutputs.clear(); List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs); onDiskMapOutputs.clear(); return finalMerge(jobConf, rfs, memory, disk); }
private void combineAndSpill( RawKeyValueIterator kvIter, Counters.Counter inCounter) throws IOException { JobConf job = jobConf; Reducer combiner = ReflectionUtils.newInstance(combinerClass, job); Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass(); Class<V> valClass = (Class<V>) job.getMapOutputValueClass(); RawComparator<K> comparator = (RawComparator<K>)job.getCombinerKeyGroupingComparator(); try { CombineValuesIterator values = new CombineValuesIterator( kvIter, comparator, keyClass, valClass, job, Reporter.NULL, inCounter); while (values.more()) { combiner.reduce(values.getKey(), values, combineCollector, Reporter.NULL); values.nextKey(); } } finally { combiner.close(); } }
public ReduceContext(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriter<KEYOUT,VALUEOUT> output, OutputCommitter committer, StatusReporter reporter, RawComparator<KEYIN> comparator, Class<KEYIN> keyClass, Class<VALUEIN> valueClass ) throws InterruptedException, IOException{ super(conf, taskid, output, committer, reporter); this.input = input; this.inputKeyCounter = inputKeyCounter; this.inputValueCounter = inputValueCounter; this.comparator = comparator; SerializationFactory serializationFactory = new SerializationFactory(conf); this.keyDeserializer = serializationFactory.getDeserializer(keyClass); this.keyDeserializer.open(buffer); this.valueDeserializer = serializationFactory.getDeserializer(valueClass); this.valueDeserializer.open(buffer); hasMore = input.next(); }
private void combineAndSpill( RawKeyValueIterator kvIter, Counters.Counter inCounter) throws IOException { JobConf job = jobConf; Reducer combiner = ReflectionUtils.newInstance(combinerClass, job); Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass(); Class<V> valClass = (Class<V>) job.getMapOutputValueClass(); RawComparator<K> comparator = (RawComparator<K>)job.getOutputKeyComparator(); try { CombineValuesIterator values = new CombineValuesIterator( kvIter, comparator, keyClass, valClass, job, Reporter.NULL, inCounter); while (values.more()) { combiner.reduce(values.getKey(), values, combineCollector, Reporter.NULL); values.nextKey(); } } finally { combiner.close(); } }
public ReduceContext(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputCounter, RecordWriter<KEYOUT,VALUEOUT> output, OutputCommitter committer, StatusReporter reporter, RawComparator<KEYIN> comparator, Class<KEYIN> keyClass, Class<VALUEIN> valueClass ) throws InterruptedException, IOException{ super(conf, taskid, output, committer, reporter); this.input = input; this.inputCounter = inputCounter; this.comparator = comparator; SerializationFactory serializationFactory = new SerializationFactory(conf); this.keyDeserializer = serializationFactory.getDeserializer(keyClass); this.keyDeserializer.open(buffer); this.valueDeserializer = serializationFactory.getDeserializer(valueClass); this.valueDeserializer.open(buffer); hasMore = input.next(); }
public RawKeyValueIterator runLocal(Path[] localFiles) throws IOException, InterruptedException { for (Path file : localFiles) { addMapOutputSegments(file); } try { return finish(); } catch (Throwable e) { throw new Shuffle.ShuffleError("Error while doing final merge ", e); } }
@SuppressWarnings("unchecked") public RawKeyValueIterator finish() throws Throwable { // merge config params Class<K> keyClass = (Class<K>) jobConf.getMapOutputKeyClass(); Class<V> valueClass = (Class<V>) jobConf.getMapOutputValueClass(); final RawComparator<K> comparator = (RawComparator<K>) jobConf.getOutputKeyComparator(); // Wait for on-going merges to complete merger.close(); LOG.info("finalMerge called with " + segmentsToBeMerged.size() + " on-disk map-outputs"); List<Segment<K, V>> segments = new ArrayList<Segment<K, V>>(); long onDiskBytes = 0; for (Segment<K, V> segment : segmentsToBeMerged) { long fileLength = segment.getLength(); onDiskBytes += fileLength; LOG.debug("Disk file: " + segment + " Length is " + fileLength); segments.add(segment); } segmentsToBeMerged.clear(); LOG.info("Merging " + segmentsToBeMerged.size() + " files, " + onDiskBytes + " bytes from disk"); Collections.sort(segments, new Comparator<Segment<K, V>>() { public int compare(Segment<K, V> o1, Segment<K, V> o2) { if (o1.getLength() == o2.getLength()) { return 0; } return o1.getLength() < o2.getLength() ? -1 : 1; } }); return Merger.merge(jobConf, lustrefs, keyClass, valueClass, segments, segments.size(), mergeTempDir, comparator, reporter, spilledRecordsCounter, null, null); }
@Override public void merge(List<InMemoryMapOutput<K, V>> inputs) throws IOException { if (inputs == null || inputs.size() == 0) { return; } TaskAttemptID dummyMapId = inputs.get(0).getMapId(); List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>(); long mergeOutputSize = createInMemorySegments(inputs, inMemorySegments, 0); int noInMemorySegments = inMemorySegments.size(); InMemoryMapOutput<K, V> mergedMapOutputs = unconditionalReserve(dummyMapId, mergeOutputSize, false); Writer<K, V> writer = new InMemoryWriter<K, V>(mergedMapOutputs.getArrayStream()); LOG.info("Initiating Memory-to-Memory merge with " + noInMemorySegments + " segments of total-size: " + mergeOutputSize); RawKeyValueIterator rIter = Merger.merge(jobConf, rfs, (Class<K>)jobConf.getMapOutputKeyClass(), (Class<V>)jobConf.getMapOutputValueClass(), inMemorySegments, inMemorySegments.size(), new Path(reduceId.toString()), (RawComparator<K>)jobConf.getOutputKeyComparator(), reporter, null, null, null); Merger.writeFile(rIter, writer, reporter, jobConf); writer.close(); LOG.info(reduceId + " Memory-to-Memory merge of the " + noInMemorySegments + " files in-memory complete."); // Note the output of the merge closeInMemoryMergedFile(mergedMapOutputs); }
public BufferPullee(Class<IK> iKClass, Class<IV> iVClass, RawKeyValueIterator rIter, NativeDataTarget target) throws IOException { this.rIter = rIter; tmpInputKey = new SizedWritable<IK>(iKClass); tmpInputValue = new SizedWritable<IV>(iVClass); if (null != iKClass && null != iVClass) { this.serializer = new KVSerializer<IK, IV>(iKClass, iVClass); } this.outputBuffer = target.getOutputBuffer(); this.target = target; }