@SuppressWarnings({"rawtypes", "unchecked"}) @Test (timeout=10000) public void testLoadMapper() throws Exception { Configuration conf = new Configuration(); conf.setInt(JobContext.NUM_REDUCES, 2); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); TaskAttemptID taskId = new TaskAttemptID(); RecordReader<NullWritable, GridmixRecord> reader = new FakeRecordReader(); LoadRecordGkGrWriter writer = new LoadRecordGkGrWriter(); OutputCommitter committer = new CustomOutputCommitter(); StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter(); LoadSplit split = getLoadSplit(); MapContext<NullWritable, GridmixRecord, GridmixKey, GridmixRecord> mapContext = new MapContextImpl<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>( conf, taskId, reader, writer, committer, reporter, split); // context Context ctx = new WrappedMapper<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>() .getMapContext(mapContext); reader.initialize(split, ctx); ctx.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); CompressionEmulationUtil.setCompressionEmulationEnabled( ctx.getConfiguration(), true); LoadJob.LoadMapper mapper = new LoadJob.LoadMapper(); // setup, map, clean mapper.run(ctx); Map<GridmixKey, GridmixRecord> data = writer.getData(); // check result assertEquals(2, data.size()); }
/** * Utility to generate dummy Mapper#Context for use in Giraph internals. * This is the "key hack" to inject MapReduce-related data structures * containing YARN cluster metadata (and our GiraphConf from the AppMaster) * into our Giraph BSP task code. * @param tid the TaskAttemptID to construct this Mapper#Context from. * @return sort of a Mapper#Context if you squint just right. */ private Context buildProxyMapperContext(final TaskAttemptID tid) { MapContext mc = new MapContextImpl<Object, Object, Object, Object>( conf, // our Configuration, populated back at the GiraphYarnClient. tid, // our TaskAttemptId, generated w/YARN app, container, attempt IDs null, // RecordReader here will never be used by Giraph null, // RecordWriter here will never be used by Giraph null, // OutputCommitter here will never be used by Giraph new TaskAttemptContextImpl.DummyReporter() { // goes in task logs for now @Override public void setStatus(String msg) { LOG.info("[STATUS: task-" + bspTaskId + "] " + msg); } }, null); // Input split setting here will never be used by Giraph // now, we wrap our MapContext ref so we can produce a Mapper#Context WrappedMapper<Object, Object, Object, Object> wrappedMapper = new WrappedMapper<Object, Object, Object, Object>(); return wrappedMapper.getMapContext(mc); }
public StubContext(Configuration conf, RecordReader<Text, CopyListingFileStatus> reader, int taskId) throws IOException, InterruptedException { WrappedMapper<Text, CopyListingFileStatus, Text, Text> wrappedMapper = new WrappedMapper<>(); MapContextImpl<Text, CopyListingFileStatus, Text, Text> contextImpl = new MapContextImpl<>(conf, getTaskAttemptID(taskId), reader, writer, null, reporter, null); this.reader = reader; mapperContext = wrappedMapper.getMapContext(contextImpl); }
/** * Create a map context that is based on ChainMapContext and the given record * reader and record writer */ private <KEYIN, VALUEIN, KEYOUT, VALUEOUT> Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context createMapContext( RecordReader<KEYIN, VALUEIN> rr, RecordWriter<KEYOUT, VALUEOUT> rw, TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> context, Configuration conf) { MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext = new ChainMapContextImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT>( context, rr, rw, conf); Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context mapperContext = new WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>() .getMapContext(mapContext); return mapperContext; }
@Test public void testCloneMapContext() throws Exception { TaskID taskId = new TaskID(jobId, TaskType.MAP, 0); TaskAttemptID taskAttemptid = new TaskAttemptID(taskId, 0); MapContext<IntWritable, IntWritable, IntWritable, IntWritable> mapContext = new MapContextImpl<IntWritable, IntWritable, IntWritable, IntWritable>( conf, taskAttemptid, null, null, null, null, null); Mapper<IntWritable, IntWritable, IntWritable, IntWritable>.Context mapperContext = new WrappedMapper<IntWritable, IntWritable, IntWritable, IntWritable>().getMapContext( mapContext); ContextFactory.cloneMapContext(mapperContext, conf, null, null); }
public StubContext(Configuration conf, RecordReader<Text, CopyListingFileStatus> reader, int taskId) throws IOException, InterruptedException { WrappedMapper<Text, CopyListingFileStatus, Text, Text> wrappedMapper = new WrappedMapper<Text, CopyListingFileStatus, Text, Text>(); MapContextImpl<Text, CopyListingFileStatus, Text, Text> contextImpl = new MapContextImpl<Text, CopyListingFileStatus, Text, Text>(conf, getTaskAttemptID(taskId), reader, writer, null, reporter, null); this.reader = reader; this.mapperContext = wrappedMapper.getMapContext(contextImpl); }
@SuppressWarnings({"unchecked", "rawtypes"}) @Test (timeout=30000) public void testSleepMapper() throws Exception { SleepJob.SleepMapper test = new SleepJob.SleepMapper(); Configuration conf = new Configuration(); conf.setInt(JobContext.NUM_REDUCES, 2); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); TaskAttemptID taskId = new TaskAttemptID(); FakeRecordLLReader reader = new FakeRecordLLReader(); LoadRecordGkNullWriter writer = new LoadRecordGkNullWriter(); OutputCommitter committer = new CustomOutputCommitter(); StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter(); SleepSplit split = getSleepSplit(); MapContext<LongWritable, LongWritable, GridmixKey, NullWritable> mapcontext = new MapContextImpl<LongWritable, LongWritable, GridmixKey, NullWritable>( conf, taskId, reader, writer, committer, reporter, split); Context context = new WrappedMapper<LongWritable, LongWritable, GridmixKey, NullWritable>() .getMapContext(mapcontext); long start = System.currentTimeMillis(); LOG.info("start:" + start); LongWritable key = new LongWritable(start + 2000); LongWritable value = new LongWritable(start + 2000); // should slip 2 sec test.map(key, value, context); LOG.info("finish:" + System.currentTimeMillis()); assertTrue(System.currentTimeMillis() >= (start + 2000)); test.cleanup(context); assertEquals(1, writer.getData().size()); }
public StubContext(Configuration conf, RecordReader<Text, FileStatus> reader, int taskId) throws IOException, InterruptedException { WrappedMapper<Text, FileStatus, Text, Text> wrappedMapper = new WrappedMapper<Text, FileStatus, Text, Text>(); MapContextImpl<Text, FileStatus, Text, Text> contextImpl = new MapContextImpl<Text, FileStatus, Text, Text>(conf, getTaskAttemptID(taskId), reader, writer, null, reporter, null); this.reader = reader; this.mapperContext = wrappedMapper.getMapContext(contextImpl); }
@SuppressWarnings({"unchecked", "rawtypes"}) @Test (timeout=10000) public void testSleepMapper() throws Exception { SleepJob.SleepMapper test = new SleepJob.SleepMapper(); Configuration conf = new Configuration(); conf.setInt(JobContext.NUM_REDUCES, 2); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); TaskAttemptID taskId = new TaskAttemptID(); FakeRecordLLReader reader = new FakeRecordLLReader(); LoadRecordGkNullWriter writer = new LoadRecordGkNullWriter(); OutputCommitter committer = new CustomOutputCommitter(); StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter(); SleepSplit split = getSleepSplit(); MapContext<LongWritable, LongWritable, GridmixKey, NullWritable> mapcontext = new MapContextImpl<LongWritable, LongWritable, GridmixKey, NullWritable>( conf, taskId, reader, writer, committer, reporter, split); Context context = new WrappedMapper<LongWritable, LongWritable, GridmixKey, NullWritable>() .getMapContext(mapcontext); long start = System.currentTimeMillis(); LOG.info("start:" + start); LongWritable key = new LongWritable(start + 2000); LongWritable value = new LongWritable(start + 2000); // should slip 2 sec test.map(key, value, context); LOG.info("finish:" + System.currentTimeMillis()); assertTrue(System.currentTimeMillis() >= (start + 2000)); test.cleanup(context); assertEquals(1, writer.getData().size()); }
@SuppressWarnings("unchecked") @Override protected void setup(final Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); @SuppressWarnings("unchecked") Class<Mapper>[] mappersClass = (Class<Mapper>[]) conf.getClasses(CONF_KEY); mappers = new ArrayList<Mapper>(mappersClass.length); cleanups = new ArrayList<Method>(mappersClass.length); maps = new ArrayList<Method>(mappersClass.length); WrappedMapper wrappedMapper = new WrappedMapper(); contexts = Lists.newArrayList(); int[] redirectToReducer = context.getConfiguration().getInts(MultiJob.REDIRECT_TO_REDUCER); for (int i = 0; i < mappersClass.length; i++) { Class<Mapper> mapperClass = mappersClass[i]; final int finalI = redirectToReducer[i]; WrappedMapper.Context myContext = wrappedMapper.new Context(context) { @Override public void write(Object k, Object v) throws IOException, InterruptedException { context.write(new PerMapperOutputKey(finalI, k), new PerMapperOutputValue(finalI, v)); } }; contexts.add(myContext); Mapper mapper = ReflectionUtils.newInstance(mapperClass, conf); mappers.add(mapper); Methods.invoke(Methods.get(mapperClass, "setup", Context.class), mapper, myContext); cleanups.add(Methods.get(mapperClass, "cleanup", Context.class)); maps.add(Methods.getWithNameMatches(mapperClass, "map")); } }