MapSpiller(JobConf job,TaskAttemptID tid, TaskReporter rep) throws ClassNotFoundException { reporter = rep; conf = job; this.taskId = tid; mapOutputFile.setConf(conf); mapOutputByteCounter = reporter.getCounter(MAP_OUTPUT_BYTES); Counters.Counter combineInputCounter = reporter.getCounter(COMBINE_INPUT_RECORDS); combineOutputCounter = reporter.getCounter(COMBINE_OUTPUT_RECORDS); fileOutputByteCounter = reporter.getCounter(MAP_OUTPUT_MATERIALIZED_BYTES); // combiner combinerRunner = CombinerRunner.create(conf, taskId, combineInputCounter, reporter, null); if (combinerRunner != null) { combineCollector= new CombineOutputCollector(combineOutputCounter, reporter, conf); } else { combineCollector = null; } indexCacheList = new ArrayList<SpillRecord>(); spilledRecordsCounter = reporter.getCounter(Counter.SPILLED_RECORDS); }
public static <K, V> ICombineHandler create(TaskContext context) throws IOException, ClassNotFoundException { final JobConf conf = new JobConf(context.getConf()); conf.set(Constants.SERIALIZATION_FRAMEWORK, String.valueOf(SerializationFramework.WRITABLE_SERIALIZATION.getType())); String combinerClazz = conf.get(Constants.MAPRED_COMBINER_CLASS); if (null == combinerClazz) { combinerClazz = conf.get(MRJobConfig.COMBINE_CLASS_ATTR); } if (null == combinerClazz) { return null; } else { LOG.info("NativeTask Combiner is enabled, class = " + combinerClazz); } final Counter combineInputCounter = context.getTaskReporter().getCounter( TaskCounter.COMBINE_INPUT_RECORDS); final CombinerRunner<K, V> combinerRunner = CombinerRunner.create( conf, context.getTaskAttemptId(), combineInputCounter, context.getTaskReporter(), null); final INativeHandler nativeHandler = NativeBatchProcessor.create( NAME, conf, DataChannel.INOUT); @SuppressWarnings("unchecked") final BufferPusher<K, V> pusher = new BufferPusher<K, V>((Class<K>)context.getInputKeyClass(), (Class<V>)context.getInputValueClass(), nativeHandler); final BufferPuller puller = new BufferPuller(nativeHandler); return new CombinerHandler<K, V>(nativeHandler, combinerRunner, puller, pusher); }
public CombinerHandler(INativeHandler nativeHandler, CombinerRunner<K, V> combiner, BufferPuller puller, BufferPusher<K, V> kvPusher) throws IOException { this.nativeHandler = nativeHandler; this.combinerRunner = combiner; this.puller = puller; this.kvPusher = kvPusher; nativeHandler.setCommandDispatcher(this); nativeHandler.setDataReceiver(puller); }
@Before public void setUp() throws IOException { this.nativeHandler = Mockito.mock(INativeHandler.class); this.pusher = Mockito.mock(BufferPusher.class); this.puller = Mockito.mock(BufferPuller.class); this.combinerRunner = Mockito.mock(CombinerRunner.class); Mockito.when(nativeHandler.getInputBuffer()).thenReturn( new InputBuffer(BufferType.HEAP_BUFFER, 100)); }