private static List<String> readSplit(FixedLengthInputFormat format, InputSplit split, Job job) throws Exception { List<String> result = new ArrayList<String>(); TaskAttemptContext context = MapReduceTestUtil. createDummyMapTaskAttemptContext(job.getConfiguration()); RecordReader<LongWritable, BytesWritable> reader = format.createRecordReader(split, context); MapContext<LongWritable, BytesWritable, LongWritable, BytesWritable> mcontext = new MapContextImpl<LongWritable, BytesWritable, LongWritable, BytesWritable>(job.getConfiguration(), context.getTaskAttemptID(), reader, null, null, MapReduceTestUtil.createDummyReporter(), split); reader.initialize(split, mcontext); LongWritable key; BytesWritable value; while (reader.nextKeyValue()) { key = reader.getCurrentKey(); value = reader.getCurrentValue(); result.add(new String(value.getBytes(), 0, value.getLength())); } reader.close(); return result; }
private static List<Text> readSplit(KeyValueTextInputFormat format, InputSplit split, Job job) throws IOException, InterruptedException { List<Text> result = new ArrayList<Text>(); Configuration conf = job.getConfiguration(); TaskAttemptContext context = MapReduceTestUtil. createDummyMapTaskAttemptContext(conf); RecordReader<Text, Text> reader = format.createRecordReader(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); MapContext<Text, Text, Text, Text> mcontext = new MapContextImpl<Text, Text, Text, Text>(conf, context.getTaskAttemptID(), reader, null, null, MapReduceTestUtil.createDummyReporter(), split); reader.initialize(split, mcontext); while (reader.nextKeyValue()) { result.add(new Text(reader.getCurrentValue())); } reader.close(); return result; }
private static List<Text> readSplit(InputFormat<LongWritable,Text> format, InputSplit split, Job job) throws IOException, InterruptedException { List<Text> result = new ArrayList<Text>(); Configuration conf = job.getConfiguration(); TaskAttemptContext context = MapReduceTestUtil. createDummyMapTaskAttemptContext(conf); RecordReader<LongWritable, Text> reader = format.createRecordReader(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); MapContext<LongWritable,Text,LongWritable,Text> mcontext = new MapContextImpl<LongWritable,Text,LongWritable,Text>(conf, context.getTaskAttemptID(), reader, null, null, MapReduceTestUtil.createDummyReporter(), split); reader.initialize(split, mcontext); while (reader.nextKeyValue()) { result.add(new Text(reader.getCurrentValue())); } return result; }
/** * Add mapper(the first mapper) that reads input from the input * context and writes to queue */ @SuppressWarnings("unchecked") void addMapper(TaskInputOutputContext inputContext, ChainBlockingQueue<KeyValuePair<?, ?>> output, int index) throws IOException, InterruptedException { Configuration conf = getConf(index); Class<?> keyOutClass = conf.getClass(MAPPER_OUTPUT_KEY_CLASS, Object.class); Class<?> valueOutClass = conf.getClass(MAPPER_OUTPUT_VALUE_CLASS, Object.class); RecordReader rr = new ChainRecordReader(inputContext); RecordWriter rw = new ChainRecordWriter(keyOutClass, valueOutClass, output, conf); Mapper.Context mapperContext = createMapContext(rr, rw, (MapContext) inputContext, getConf(index)); MapRunner runner = new MapRunner(mappers.get(index), mapperContext, rr, rw); threads.add(runner); }
@SuppressWarnings({"rawtypes", "unchecked"}) @Test (timeout=10000) public void testLoadMapper() throws Exception { Configuration conf = new Configuration(); conf.setInt(JobContext.NUM_REDUCES, 2); CompressionEmulationUtil.setCompressionEmulationEnabled(conf, true); conf.setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); TaskAttemptID taskId = new TaskAttemptID(); RecordReader<NullWritable, GridmixRecord> reader = new FakeRecordReader(); LoadRecordGkGrWriter writer = new LoadRecordGkGrWriter(); OutputCommitter committer = new CustomOutputCommitter(); StatusReporter reporter = new TaskAttemptContextImpl.DummyReporter(); LoadSplit split = getLoadSplit(); MapContext<NullWritable, GridmixRecord, GridmixKey, GridmixRecord> mapContext = new MapContextImpl<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>( conf, taskId, reader, writer, committer, reporter, split); // context Context ctx = new WrappedMapper<NullWritable, GridmixRecord, GridmixKey, GridmixRecord>() .getMapContext(mapContext); reader.initialize(split, ctx); ctx.getConfiguration().setBoolean(MRJobConfig.MAP_OUTPUT_COMPRESS, true); CompressionEmulationUtil.setCompressionEmulationEnabled( ctx.getConfiguration(), true); LoadJob.LoadMapper mapper = new LoadJob.LoadMapper(); // setup, map, clean mapper.run(ctx); Map<GridmixKey, GridmixRecord> data = writer.getData(); // check result assertEquals(2, data.size()); }
@SuppressWarnings({ "unchecked", "rawtypes" }) private static <K1, V1, K2, V2> Mapper<K1, V1, K2, V2>.Context buildNewMapperContext( Configuration configuration, RecordWriter<K2, V2> output) throws Exception { Class<?> mapContextImplClass = Class .forName("org.apache.hadoop.mapreduce.task.MapContextImpl"); Constructor<?> cons = mapContextImplClass.getConstructors()[0]; Object mapContextImpl = cons.newInstance(configuration, new TaskAttemptID(), null, output, null, new DummyStatusReporter(), null); Class<?> wrappedMapperClass = Class .forName("org.apache.hadoop.mapreduce.lib.map.WrappedMapper"); Object wrappedMapper = wrappedMapperClass.newInstance(); Method getMapContext = wrappedMapperClass.getMethod("getMapContext", MapContext.class); return (Mapper.Context) getMapContext.invoke(wrappedMapper, mapContextImpl); }
private static List<Text> readSplit(KeyValueTextInputFormat format, InputSplit split, Job job) throws IOException, InterruptedException { List<Text> result = new ArrayList<Text>(); Configuration conf = job.getConfiguration(); TaskAttemptContext context = MapReduceTestUtil. createDummyMapTaskAttemptContext(conf); RecordReader<Text, Text> reader = format.createRecordReader(split, MapReduceTestUtil.createDummyMapTaskAttemptContext(conf)); MapContext<Text, Text, Text, Text> mcontext = new MapContextImpl<Text, Text, Text, Text>(conf, context.getTaskAttemptID(), reader, null, null, MapReduceTestUtil.createDummyReporter(), split); reader.initialize(split, mcontext); while (reader.nextKeyValue()) { result.add(new Text(reader.getCurrentValue())); } return result; }
/** * Set the progress of the current task. * *Note: Works only when using a Virtual Input Format * * @param value value of the progress must lie within [0.0, 1.0] */ public static void setProgress(float value) { if (PhaseContext.isIntialized()) { final MapContext mapContext = PhaseContext.getMapContext(); try { final FloatWritable progress = (FloatWritable) mapContext.getCurrentKey(); progress.set(value); mapContext.nextKeyValue(); } catch (Exception e) { System.err.println("Unable to report progress in Load Cyclic. Exception: " + e); e.printStackTrace(); } } }
private void writeDebugHeader() { processError("===== Task Information Header =====" ); processError("\nCommand: " + command); processError("\nStart time: " + new Date(System.currentTimeMillis())); if (job.getBoolean("mapred.task.is.map", false)) { MapContext context = (MapContext)PigMapReduce.sJobContext; PigSplit pigSplit = (PigSplit)context.getInputSplit(); InputSplit wrappedSplit = pigSplit.getWrappedSplit(); if (wrappedSplit instanceof FileSplit) { FileSplit mapInputFileSplit = (FileSplit)wrappedSplit; processError("\nInput-split file: " + mapInputFileSplit.getPath().toString()); processError("\nInput-split start-offset: " + Long.toString(mapInputFileSplit.getStart())); processError("\nInput-split length: " + Long.toString(mapInputFileSplit.getLength())); } } processError("\n===== * * * =====\n"); }
@Override public void initialize(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit) split; inputByteCounter = ((MapContext)context).getCounter( FileInputFormat.COUNTER_GROUP, FileInputFormat.BYTES_READ); conf = context.getConfiguration(); Path path = fileSplit.getPath(); FileSystem fs = path.getFileSystem(conf); this.in = new SequenceFile.Reader(fs, path, conf); this.end = fileSplit.getStart() + fileSplit.getLength(); if (fileSplit.getStart() > in.getPosition()) { in.sync(fileSplit.getStart()); // sync to start } this.start = in.getPosition(); more = start < end; }