@Test public void testCustomCollect() throws Throwable { //mock creation TaskReporter mockTaskReporter = mock(TaskReporter.class); @SuppressWarnings("unchecked") Writer<String, Integer> mockWriter = mock(Writer.class); Configuration conf = new Configuration(); conf.set(MRJobConfig.COMBINE_RECORDS_BEFORE_PROGRESS, "2"); coc = new CombineOutputCollector<String, Integer>(outCounter, mockTaskReporter, conf); coc.setWriter(mockWriter); verify(mockTaskReporter, never()).progress(); coc.collect("dummy", 1); verify(mockTaskReporter, never()).progress(); coc.collect("dummy", 2); verify(mockTaskReporter, times(1)).progress(); }
@Test public void testDefaultCollect() throws Throwable { //mock creation TaskReporter mockTaskReporter = mock(TaskReporter.class); @SuppressWarnings("unchecked") Writer<String, Integer> mockWriter = mock(Writer.class); Configuration conf = new Configuration(); coc = new CombineOutputCollector<String, Integer>(outCounter, mockTaskReporter, conf); coc.setWriter(mockWriter); verify(mockTaskReporter, never()).progress(); for(int i = 0; i < Task.DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS; i++) { coc.collect("dummy", i); } verify(mockTaskReporter, times(1)).progress(); for(int i = 0; i < Task.DEFAULT_COMBINE_RECORDS_BEFORE_PROGRESS; i++) { coc.collect("dummy", i); } verify(mockTaskReporter, times(2)).progress(); }
@Test public void testCustomCollect() throws Throwable { //mock creation TaskReporter mockTaskReporter = mock(TaskReporter.class); Counters.Counter outCounter = new Counters.Counter(); Writer<String, Integer> mockWriter = mock(Writer.class); Configuration conf = new Configuration(); conf.set("mapred.combine.recordsBeforeProgress", "2"); coc = new CombineOutputCollector<String, Integer>(outCounter, mockTaskReporter, conf); coc.setWriter(mockWriter); verify(mockTaskReporter, never()).progress(); coc.collect("dummy", 1); verify(mockTaskReporter, never()).progress(); coc.collect("dummy", 2); verify(mockTaskReporter, times(1)).progress(); }
@Test public void testDefaultCollect() throws Throwable { //mock creation TaskReporter mockTaskReporter = mock(TaskReporter.class); Counters.Counter outCounter = new Counters.Counter(); Writer<String, Integer> mockWriter = mock(Writer.class); Configuration conf = new Configuration(); coc = new CombineOutputCollector<String, Integer>(outCounter, mockTaskReporter, conf); coc.setWriter(mockWriter); verify(mockTaskReporter, never()).progress(); for(int i = 0; i < Task.DEFAULT_MR_COMBINE_RECORDS_BEFORE_PROGRESS; i++) { coc.collect("dummy", i); } verify(mockTaskReporter, times(1)).progress(); for(int i = 0; i < Task.DEFAULT_MR_COMBINE_RECORDS_BEFORE_PROGRESS; i++) { coc.collect("dummy", i); } verify(mockTaskReporter, times(2)).progress(); }
MapSpiller(JobConf job,TaskAttemptID tid, TaskReporter rep) throws ClassNotFoundException { reporter = rep; conf = job; this.taskId = tid; mapOutputFile.setConf(conf); mapOutputByteCounter = reporter.getCounter(MAP_OUTPUT_BYTES); Counters.Counter combineInputCounter = reporter.getCounter(COMBINE_INPUT_RECORDS); combineOutputCounter = reporter.getCounter(COMBINE_OUTPUT_RECORDS); fileOutputByteCounter = reporter.getCounter(MAP_OUTPUT_MATERIALIZED_BYTES); // combiner combinerRunner = CombinerRunner.create(conf, taskId, combineInputCounter, reporter, null); if (combinerRunner != null) { combineCollector= new CombineOutputCollector(combineOutputCounter, reporter, conf); } else { combineCollector = null; } indexCacheList = new ArrayList<SpillRecord>(); spilledRecordsCounter = reporter.getCounter(Counter.SPILLED_RECORDS); }
public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId, JobConf jobConf, FileSystem localFS, TaskUmbilicalProtocol umbilical, LocalDirAllocator localDirAllocator, Reporter reporter, CompressionCodec codec, Class<? extends Reducer> combinerClass, CombineOutputCollector<K,V> combineCollector, Counters.Counter spilledRecordsCounter, Counters.Counter reduceCombineInputCounter, Counters.Counter shuffledMapsCounter, Counters.Counter reduceShuffleBytes, Counters.Counter failedShuffleCounter, Counters.Counter mergedMapOutputsCounter, TaskStatus status, Progress copyPhase, Progress mergePhase, Task reduceTask, MapOutputFile mapOutputFile, Map<TaskAttemptID, MapOutputFile> localMapFiles) { this.reduceId = reduceId; this.jobConf = jobConf; this.localFS = localFS; this. umbilical = umbilical; this.localDirAllocator = localDirAllocator; this.reporter = reporter; this.codec = codec; this.combinerClass = combinerClass; this.combineCollector = combineCollector; this.spilledRecordsCounter = spilledRecordsCounter; this.reduceCombineInputCounter = reduceCombineInputCounter; this.shuffledMapsCounter = shuffledMapsCounter; this.reduceShuffleBytes = reduceShuffleBytes; this.failedShuffleCounter = failedShuffleCounter; this.mergedMapOutputsCounter = mergedMapOutputsCounter; this.status = status; this.copyPhase = copyPhase; this.mergePhase = mergePhase; this.reduceTask = reduceTask; this.mapOutputFile = mapOutputFile; this.localMapFiles = localMapFiles; }
public Context(org.apache.hadoop.mapreduce.TaskAttemptID reduceId, JobConf jobConf, FileSystem localFS, TaskUmbilicalProtocol umbilical, LocalDirAllocator localDirAllocator, Reporter reporter, CompressionCodec codec, Class<? extends Reducer> combinerClass, CombineOutputCollector<K,V> combineCollector, Counters.Counter spilledRecordsCounter, Counters.Counter reduceCombineInputCounter, Counters.Counter shuffledMapsCounter, Counters.Counter reduceShuffleBytes, Counters.Counter failedShuffleCounter, Counters.Counter mergedMapOutputsCounter, TaskStatus status, Progress copyPhase, Progress mergePhase, Task reduceTask, MapOutputFile mapOutputFile) { this.reduceId = reduceId; this.jobConf = jobConf; this.localFS = localFS; this. umbilical = umbilical; this.localDirAllocator = localDirAllocator; this.reporter = reporter; this.codec = codec; this.combinerClass = combinerClass; this.combineCollector = combineCollector; this.spilledRecordsCounter = spilledRecordsCounter; this.reduceCombineInputCounter = reduceCombineInputCounter; this.shuffledMapsCounter = shuffledMapsCounter; this.reduceShuffleBytes = reduceShuffleBytes; this.failedShuffleCounter = failedShuffleCounter; this.mergedMapOutputsCounter = mergedMapOutputsCounter; this.status = status; this.copyPhase = copyPhase; this.mergePhase = mergePhase; this.reduceTask = reduceTask; this.mapOutputFile = mapOutputFile; }