@Override public void init(ShuffleConsumerPlugin.Context context) { this.context = context; this.reduceId = context.getReduceId(); this.jobConf = context.getJobConf(); this.umbilical = context.getUmbilical(); this.reporter = context.getReporter(); this.metrics = new ShuffleClientMetrics(reduceId, jobConf); this.copyPhase = context.getCopyPhase(); this.taskStatus = context.getStatus(); this.reduceTask = context.getReduceTask(); this.localMapFiles = context.getLocalMapFiles(); scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId, this, copyPhase, context.getShuffledMapsCounter(), context.getReduceShuffleBytes(), context.getFailedShuffleCounter()); merger = createMergeManager(context); }
@Override public void init(ShuffleConsumerPlugin.Context<K, V> context) { // just verify that Context has kept its public interface context.getReduceId(); context.getJobConf(); context.getLocalFS(); context.getUmbilical(); context.getLocalDirAllocator(); context.getReporter(); context.getCodec(); context.getCombinerClass(); context.getCombineCollector(); context.getSpilledRecordsCounter(); context.getReduceCombineInputCounter(); context.getShuffledMapsCounter(); context.getReduceShuffleBytes(); context.getFailedShuffleCounter(); context.getMergedMapOutputsCounter(); context.getStatus(); context.getCopyPhase(); context.getMergePhase(); context.getReduceTask(); context.getMapOutputFile(); }
@Override public void init(ShuffleConsumerPlugin.Context context) { this.context = context; this.reduceId = context.getReduceId(); this.jobConf = context.getJobConf(); this.umbilical = context.getUmbilical(); this.reporter = context.getReporter(); this.metrics = new ShuffleClientMetrics(reduceId, jobConf); this.copyPhase = context.getCopyPhase(); this.taskStatus = context.getStatus(); this.reduceTask = context.getReduceTask(); scheduler = new ShuffleSchedulerImpl<K, V>(jobConf, taskStatus, reduceId, this, copyPhase, context.getShuffledMapsCounter(), context.getReduceShuffleBytes(), context.getFailedShuffleCounter()); merger = createMergeManager(context); }
@Override public void init(ShuffleConsumerPlugin.Context context) { this.reduceId = context.getReduceId(); this.jobConf = context.getJobConf(); this.umbilical = context.getUmbilical(); this.reporter = context.getReporter(); this.copyPhase = context.getCopyPhase(); this.mergePhase = context.getMergePhase(); this.taskStatus = context.getStatus(); this.reduceTask = context.getReduceTask(); this.codec = context.getCodec(); this.spilledRecordsCounter = context.getSpilledRecordsCounter(); this.mergedMapOutputsCounter = context.getMergedMapOutputsCounter(); jobConf.setBoolean(MRConfig.MAPRED_IFILE_READAHEAD, false); try { lustrefs = (LustreFileSystem)FileSystem.get(LustreFileSystem.NAME, jobConf); mapOutputDir = SharedFsPlugins.getTempPath(jobConf, JobID.downgrade(reduceId.getJobID())); reduceDir = new Path(mapOutputDir, String.format(SharedFsPlugins.MAP_OUTPUT, reduceId.getTaskID().getId(), 0, 0)).getParent(); mergeTempDir = new Path(mapOutputDir, "temp"); } catch (IOException ioe) { throw new RuntimeException("Map Output directory not found !!", ioe); } // Scheduler scheduler = new ShuffleSchedulerImpl<K, V>( jobConf, taskStatus, reduceId, this, copyPhase, context.getShuffledMapsCounter(), context.getReduceShuffleBytes(), context.getFailedShuffleCounter()); this.ioSortFactor = jobConf.getInt(MRJobConfig.IO_SORT_FACTOR, 100); this.merger = new FileMerger(); this.merger.start(); }
protected MergeManager<K, V> createMergeManager( ShuffleConsumerPlugin.Context context) { return new MergeManagerImpl<K, V>(reduceId, jobConf, context.getLocalFS(), context.getLocalDirAllocator(), reporter, context.getCodec(), context.getCombinerClass(), context.getCombineCollector(), context.getSpilledRecordsCounter(), context.getReduceCombineInputCounter(), context.getMergedMapOutputsCounter(), this, context.getMergePhase(), context.getMapOutputFile()); }
@Test /** * A testing method instructing core hadoop to load an external ShuffleConsumerPlugin * as if it came from a 3rd party. */ public void testPluginAbility() { try{ // create JobConf with mapreduce.job.shuffle.consumer.plugin=TestShuffleConsumerPlugin JobConf jobConf = new JobConf(); jobConf.setClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, TestShufflePlugin.TestShuffleConsumerPlugin.class, ShuffleConsumerPlugin.class); ShuffleConsumerPlugin shuffleConsumerPlugin = null; Class<? extends ShuffleConsumerPlugin> clazz = jobConf.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class); assertNotNull("Unable to get " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, clazz); // load 3rd party plugin through core's factory method shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, jobConf); assertNotNull("Unable to load " + MRConfig.SHUFFLE_CONSUMER_PLUGIN, shuffleConsumerPlugin); } catch (Exception e) { assertTrue("Threw exception:" + e, false); } }