@SuppressWarnings("unchecked") @Override protected void setup(Context context) throws IOException, InterruptedException { int n = context.getConfiguration().getStrings(conf_key()).length; WrappedReducer wrappedReducer = new WrappedReducer(); for (int i = 0; i < n; i++) { final int finalI = i; contexts.add(wrappedReducer.new Context(context) { @Override public void write(Object key, Object value) throws IOException, InterruptedException { super.write(new PerMapperOutputKey(finalI, key), new PerMapperOutputValue(finalI, value)); } }); } super.setup(context); }
@SuppressWarnings("unchecked") protected static <INKEY,INVALUE,OUTKEY,OUTVALUE> org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context createReduceContext(org.apache.hadoop.mapreduce.Reducer <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer, Configuration job, org.apache.hadoop.mapreduce.TaskAttemptID taskId, RawKeyValueIterator rIter, org.apache.hadoop.mapreduce.Counter inputKeyCounter, org.apache.hadoop.mapreduce.Counter inputValueCounter, org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output, org.apache.hadoop.mapreduce.OutputCommitter committer, org.apache.hadoop.mapreduce.StatusReporter reporter, RawComparator<INKEY> comparator, Class<INKEY> keyClass, Class<INVALUE> valueClass ) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE> reduceContext = new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass); org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context reducerContext = new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext( reduceContext); return reducerContext; }
/** * Create a reduce context that is based on ChainMapContext and the given * record writer */ private <KEYIN, VALUEIN, KEYOUT, VALUEOUT> Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createReduceContext( RecordWriter<KEYOUT, VALUEOUT> rw, ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context, Configuration conf) { ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> reduceContext = new ChainReduceContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>( context, rw, conf); Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context reducerContext = new WrappedReducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>() .getReducerContext(reduceContext); return reducerContext; }
@SuppressWarnings({ "rawtypes", "unchecked" }) public Reducer.Context createReduceContext(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriter output, OutputCommitter committer, StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass) throws HyracksDataException { try { return new WrappedReducer().getReducerContext(new ReduceContextImpl(conf, taskid, input, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass)); } catch (Exception e) { throw new HyracksDataException(e); } }
@SuppressWarnings("unchecked") @Override protected void setup(final Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); outputPaths = Lists.newArrayList(conf.getTrimmedStringCollection(MultiJob.OUTPUT_FORMAT_PATH)); @SuppressWarnings("unchecked") Class<Reducer>[] reducersClass = (Class<Reducer>[]) conf.getClasses(conf_key()); reducers = new ArrayList<Reducer>(reducersClass.length); cleanups = new ArrayList<Method>(reducersClass.length); reduces = new ArrayList<Method>(reducersClass.length); contexts = new ArrayList<Reducer<PerMapperOutputKey, PerMapperOutputValue, KEYOUT, VALUEOUT>.Context>(); if (outputPaths.isEmpty()) { Iterables.addAll(outputPaths, Iterables.limit(Iterables.cycle(""), reducersClass.length)); } WrappedReducer wrappedReducer = new WrappedReducer(); for (int i = 0; i < reducersClass.length; i++) { Class<Reducer> reducerClass = reducersClass[i]; Reducer reducer = ReflectionUtils.newInstance(reducerClass, conf); final int finalI = i; WrappedReducer.Context myContext = wrappedReducer.new Context(context) { @Override public void write(Object key, Object value) throws IOException, InterruptedException { context.write((KEYOUT) new PerReducerOutputKey(finalI, key), (VALUEOUT)value); } }; contexts.add(myContext); reducers.add(reducer); Methods.invoke(Methods.get(reducerClass, "setup", Context.class), reducer, getContextForReducer(context, i)); cleanups.add(Methods.get(reducerClass, "cleanup", Context.class)); reduces.add(Methods.getWithNameMatches(reducerClass, "reduce")); } }
@Test (timeout=3000) public void testLoadJobLoadReducer() throws Exception { LoadJob.LoadReducer test = new LoadJob.LoadReducer(); Configuration conf = new Configuration(); conf.setInt(JobContext.NUM_REDUCES, 2); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(FileOutputFormat.COMPRESS, true); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); TaskAttemptID taskid = new TaskAttemptID(); RawKeyValueIterator input = new FakeRawKeyValueIterator(); Counter counter = new GenericCounter(); Counter inputValueCounter = new GenericCounter(); LoadRecordWriter output = new LoadRecordWriter(); OutputCommitter committer = new CustomOutputCommitter(); StatusReporter reporter = new DummyReporter(); RawComparator<GridmixKey> comparator = new FakeRawComparator(); ReduceContext<GridmixKey, GridmixRecord, NullWritable, GridmixRecord> reduceContext = new ReduceContextImpl<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>( conf, taskid, input, counter, inputValueCounter, output, committer, reporter, comparator, GridmixKey.class, GridmixRecord.class); // read for previous data reduceContext.nextKeyValue(); org.apache.hadoop.mapreduce.Reducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>.Context context = new WrappedReducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>() .getReducerContext(reduceContext); // test.setup(context); test.run(context); // have been readed 9 records (-1 for previous) assertEquals(9, counter.getValue()); assertEquals(10, inputValueCounter.getValue()); assertEquals(1, output.getData().size()); GridmixRecord record = output.getData().values().iterator() .next(); assertEquals(1593, record.getSize()); }
@Test (timeout=3000) public void testSleepReducer() throws Exception { Configuration conf = new Configuration(); conf.setInt(JobContext.NUM_REDUCES, 2); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(FileOutputFormat.COMPRESS, true); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); TaskAttemptID taskId = new TaskAttemptID(); RawKeyValueIterator input = new FakeRawKeyValueReducerIterator(); Counter counter = new GenericCounter(); Counter inputValueCounter = new GenericCounter(); RecordWriter<NullWritable, NullWritable> output = new LoadRecordReduceWriter(); OutputCommitter committer = new CustomOutputCommitter(); StatusReporter reporter = new DummyReporter(); RawComparator<GridmixKey> comparator = new FakeRawComparator(); ReduceContext<GridmixKey, NullWritable, NullWritable, NullWritable> reducecontext = new ReduceContextImpl<GridmixKey, NullWritable, NullWritable, NullWritable>( conf, taskId, input, counter, inputValueCounter, output, committer, reporter, comparator, GridmixKey.class, NullWritable.class); org.apache.hadoop.mapreduce.Reducer<GridmixKey, NullWritable, NullWritable, NullWritable>.Context context = new WrappedReducer<GridmixKey, NullWritable, NullWritable, NullWritable>() .getReducerContext(reducecontext); SleepReducer test = new SleepReducer(); long start = System.currentTimeMillis(); test.setup(context); long sleeper = context.getCurrentKey().getReduceOutputBytes(); // status has been changed assertEquals("Sleeping... " + sleeper + " ms left", context.getStatus()); // should sleep 0.9 sec assertTrue(System.currentTimeMillis() >= (start + sleeper)); test.cleanup(context); // status has been changed again assertEquals("Slept for " + sleeper, context.getStatus()); }
@Test (timeout=1000) public void testLoadJobLoadReducer() throws Exception { LoadJob.LoadReducer test = new LoadJob.LoadReducer(); Configuration conf = new Configuration(); conf.setInt(JobContext.NUM_REDUCES, 2); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(FileOutputFormat.COMPRESS, true); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); TaskAttemptID taskid = new TaskAttemptID(); RawKeyValueIterator input = new FakeRawKeyValueIterator(); Counter counter = new GenericCounter(); Counter inputValueCounter = new GenericCounter(); LoadRecordWriter output = new LoadRecordWriter(); OutputCommitter committer = new CustomOutputCommitter(); StatusReporter reporter = new DummyReporter(); RawComparator<GridmixKey> comparator = new FakeRawComparator(); ReduceContext<GridmixKey, GridmixRecord, NullWritable, GridmixRecord> reduceContext = new ReduceContextImpl<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>( conf, taskid, input, counter, inputValueCounter, output, committer, reporter, comparator, GridmixKey.class, GridmixRecord.class); // read for previous data reduceContext.nextKeyValue(); org.apache.hadoop.mapreduce.Reducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>.Context context = new WrappedReducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>() .getReducerContext(reduceContext); // test.setup(context); test.run(context); // have been readed 9 records (-1 for previous) assertEquals(9, counter.getValue()); assertEquals(10, inputValueCounter.getValue()); assertEquals(1, output.getData().size()); GridmixRecord record = output.getData().values().iterator() .next(); assertEquals(1593, record.getSize()); }
@Test (timeout=1000) public void testSleepReducer() throws Exception { Configuration conf = new Configuration(); conf.setInt(JobContext.NUM_REDUCES, 2); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(FileOutputFormat.COMPRESS, true); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); TaskAttemptID taskId = new TaskAttemptID(); RawKeyValueIterator input = new FakeRawKeyValueReducerIterator(); Counter counter = new GenericCounter(); Counter inputValueCounter = new GenericCounter(); RecordWriter<NullWritable, NullWritable> output = new LoadRecordReduceWriter(); OutputCommitter committer = new CustomOutputCommitter(); StatusReporter reporter = new DummyReporter(); RawComparator<GridmixKey> comparator = new FakeRawComparator(); ReduceContext<GridmixKey, NullWritable, NullWritable, NullWritable> reducecontext = new ReduceContextImpl<GridmixKey, NullWritable, NullWritable, NullWritable>( conf, taskId, input, counter, inputValueCounter, output, committer, reporter, comparator, GridmixKey.class, NullWritable.class); org.apache.hadoop.mapreduce.Reducer<GridmixKey, NullWritable, NullWritable, NullWritable>.Context context = new WrappedReducer<GridmixKey, NullWritable, NullWritable, NullWritable>() .getReducerContext(reducecontext); SleepReducer test = new SleepReducer(); long start = System.currentTimeMillis(); test.setup(context); long sleeper = context.getCurrentKey().getReduceOutputBytes(); // status has been changed assertEquals("Sleeping... " + sleeper + " ms left", context.getStatus()); // should sleep 0.9 sec assertTrue(System.currentTimeMillis() >= (start + sleeper)); test.cleanup(context); // status has been changed again assertEquals("Slept for " + sleeper, context.getStatus()); }