@SuppressWarnings("unchecked") WrappingContext(Configuration configuration, TaskAttemptID id, OutputCommitter outputCommitter, RecordWriter<KEYOUT, VALUEOUT> output, KeyValueProducer<KEYIN, Iterable<VALUEIN>> transport, MapOutputAccumulator<KEYOUT, VALUEOUT> consumer ) throws IOException, InterruptedException { //Override the actual key and value class with Writables, to ensure that constructor //will not throw exception if SerializationFactory does not support that class. //Any actual serialization/deserialization is performed by DataTransport, so this //factory is never used. super(); impl = new ReduceContextImpl(configuration, id, new DummyRawIterator() , null, null, output, outputCommitter, new TaskAttemptContextImpl.DummyReporter(), null, Writable.class, Writable.class); this.transport = transport; this.mapOutputAccumulatorCallback = consumer; }
@SuppressWarnings("unchecked") protected static <INKEY,INVALUE,OUTKEY,OUTVALUE> org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context createReduceContext(org.apache.hadoop.mapreduce.Reducer <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer, Configuration job, org.apache.hadoop.mapreduce.TaskAttemptID taskId, RawKeyValueIterator rIter, org.apache.hadoop.mapreduce.Counter inputKeyCounter, org.apache.hadoop.mapreduce.Counter inputValueCounter, org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output, org.apache.hadoop.mapreduce.OutputCommitter committer, org.apache.hadoop.mapreduce.StatusReporter reporter, RawComparator<INKEY> comparator, Class<INKEY> keyClass, Class<INVALUE> valueClass ) throws IOException, InterruptedException { org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE> reduceContext = new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass); org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context reducerContext = new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext( reduceContext); return reducerContext; }
@SuppressWarnings({ "rawtypes", "unchecked" }) public Reducer.Context createReduceContext(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriter output, OutputCommitter committer, StatusReporter reporter, RawComparator comparator, Class keyClass, Class valueClass) throws HyracksDataException { try { return new WrappedReducer().getReducerContext(new ReduceContextImpl(conf, taskid, input, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass)); } catch (Exception e) { throw new HyracksDataException(e); } }
@Test (timeout=3000) public void testLoadJobLoadReducer() throws Exception { LoadJob.LoadReducer test = new LoadJob.LoadReducer(); Configuration conf = new Configuration(); conf.setInt(JobContext.NUM_REDUCES, 2); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(FileOutputFormat.COMPRESS, true); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); TaskAttemptID taskid = new TaskAttemptID(); RawKeyValueIterator input = new FakeRawKeyValueIterator(); Counter counter = new GenericCounter(); Counter inputValueCounter = new GenericCounter(); LoadRecordWriter output = new LoadRecordWriter(); OutputCommitter committer = new CustomOutputCommitter(); StatusReporter reporter = new DummyReporter(); RawComparator<GridmixKey> comparator = new FakeRawComparator(); ReduceContext<GridmixKey, GridmixRecord, NullWritable, GridmixRecord> reduceContext = new ReduceContextImpl<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>( conf, taskid, input, counter, inputValueCounter, output, committer, reporter, comparator, GridmixKey.class, GridmixRecord.class); // read for previous data reduceContext.nextKeyValue(); org.apache.hadoop.mapreduce.Reducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>.Context context = new WrappedReducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>() .getReducerContext(reduceContext); // test.setup(context); test.run(context); // have been readed 9 records (-1 for previous) assertEquals(9, counter.getValue()); assertEquals(10, inputValueCounter.getValue()); assertEquals(1, output.getData().size()); GridmixRecord record = output.getData().values().iterator() .next(); assertEquals(1593, record.getSize()); }
@Test (timeout=3000) public void testSleepReducer() throws Exception { Configuration conf = new Configuration(); conf.setInt(JobContext.NUM_REDUCES, 2); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(FileOutputFormat.COMPRESS, true); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); TaskAttemptID taskId = new TaskAttemptID(); RawKeyValueIterator input = new FakeRawKeyValueReducerIterator(); Counter counter = new GenericCounter(); Counter inputValueCounter = new GenericCounter(); RecordWriter<NullWritable, NullWritable> output = new LoadRecordReduceWriter(); OutputCommitter committer = new CustomOutputCommitter(); StatusReporter reporter = new DummyReporter(); RawComparator<GridmixKey> comparator = new FakeRawComparator(); ReduceContext<GridmixKey, NullWritable, NullWritable, NullWritable> reducecontext = new ReduceContextImpl<GridmixKey, NullWritable, NullWritable, NullWritable>( conf, taskId, input, counter, inputValueCounter, output, committer, reporter, comparator, GridmixKey.class, NullWritable.class); org.apache.hadoop.mapreduce.Reducer<GridmixKey, NullWritable, NullWritable, NullWritable>.Context context = new WrappedReducer<GridmixKey, NullWritable, NullWritable, NullWritable>() .getReducerContext(reducecontext); SleepReducer test = new SleepReducer(); long start = System.currentTimeMillis(); test.setup(context); long sleeper = context.getCurrentKey().getReduceOutputBytes(); // status has been changed assertEquals("Sleeping... " + sleeper + " ms left", context.getStatus()); // should sleep 0.9 sec assertTrue(System.currentTimeMillis() >= (start + sleeper)); test.cleanup(context); // status has been changed again assertEquals("Slept for " + sleeper, context.getStatus()); }
@Test (timeout=1000) public void testLoadJobLoadReducer() throws Exception { LoadJob.LoadReducer test = new LoadJob.LoadReducer(); Configuration conf = new Configuration(); conf.setInt(JobContext.NUM_REDUCES, 2); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(FileOutputFormat.COMPRESS, true); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); TaskAttemptID taskid = new TaskAttemptID(); RawKeyValueIterator input = new FakeRawKeyValueIterator(); Counter counter = new GenericCounter(); Counter inputValueCounter = new GenericCounter(); LoadRecordWriter output = new LoadRecordWriter(); OutputCommitter committer = new CustomOutputCommitter(); StatusReporter reporter = new DummyReporter(); RawComparator<GridmixKey> comparator = new FakeRawComparator(); ReduceContext<GridmixKey, GridmixRecord, NullWritable, GridmixRecord> reduceContext = new ReduceContextImpl<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>( conf, taskid, input, counter, inputValueCounter, output, committer, reporter, comparator, GridmixKey.class, GridmixRecord.class); // read for previous data reduceContext.nextKeyValue(); org.apache.hadoop.mapreduce.Reducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>.Context context = new WrappedReducer<GridmixKey, GridmixRecord, NullWritable, GridmixRecord>() .getReducerContext(reduceContext); // test.setup(context); test.run(context); // have been readed 9 records (-1 for previous) assertEquals(9, counter.getValue()); assertEquals(10, inputValueCounter.getValue()); assertEquals(1, output.getData().size()); GridmixRecord record = output.getData().values().iterator() .next(); assertEquals(1593, record.getSize()); }
@Test (timeout=1000) public void testSleepReducer() throws Exception { Configuration conf = new Configuration(); conf.setInt(JobContext.NUM_REDUCES, 2); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(FileOutputFormat.COMPRESS, true); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); TaskAttemptID taskId = new TaskAttemptID(); RawKeyValueIterator input = new FakeRawKeyValueReducerIterator(); Counter counter = new GenericCounter(); Counter inputValueCounter = new GenericCounter(); RecordWriter<NullWritable, NullWritable> output = new LoadRecordReduceWriter(); OutputCommitter committer = new CustomOutputCommitter(); StatusReporter reporter = new DummyReporter(); RawComparator<GridmixKey> comparator = new FakeRawComparator(); ReduceContext<GridmixKey, NullWritable, NullWritable, NullWritable> reducecontext = new ReduceContextImpl<GridmixKey, NullWritable, NullWritable, NullWritable>( conf, taskId, input, counter, inputValueCounter, output, committer, reporter, comparator, GridmixKey.class, NullWritable.class); org.apache.hadoop.mapreduce.Reducer<GridmixKey, NullWritable, NullWritable, NullWritable>.Context context = new WrappedReducer<GridmixKey, NullWritable, NullWritable, NullWritable>() .getReducerContext(reducecontext); SleepReducer test = new SleepReducer(); long start = System.currentTimeMillis(); test.setup(context); long sleeper = context.getCurrentKey().getReduceOutputBytes(); // status has been changed assertEquals("Sleeping... " + sleeper + " ms left", context.getStatus()); // should sleep 0.9 sec assertTrue(System.currentTimeMillis() >= (start + sleeper)); test.cleanup(context); // status has been changed again assertEquals("Slept for " + sleeper, context.getStatus()); }