private void runIOTest( Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass, Path outputDir) throws IOException { JobConf job = new JobConf(config, TestDFSIO.class); FileInputFormat.setInputPaths(job, getControlDir(config)); job.setInputFormat(SequenceFileInputFormat.class); job.setMapperClass(mapperClass); job.setReducerClass(AccumulatingReducer.class); FileOutputFormat.setOutputPath(job, outputDir); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(1); JobClient.runJob(job); }
public void testAddInputPathWithMapper() { final JobConf conf = new JobConf(); MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class, MapClass.class); MultipleInputs.addInputPath(conf, new Path("/bar"), KeyValueTextInputFormat.class, MapClass2.class); final Map<Path, InputFormat> inputs = MultipleInputs .getInputFormatMap(conf); final Map<Path, Class<? extends Mapper>> maps = MultipleInputs .getMapperTypeMap(conf); assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass()); assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar")) .getClass()); assertEquals(MapClass.class, maps.get(new Path("/foo"))); assertEquals(MapClass2.class, maps.get(new Path("/bar"))); }
public void configure(JobConf job) { super.configure(job); Class<?> c = job.getClass("stream.reduce.posthook", null, Mapper.class); if(c != null) { postMapper = (Mapper)ReflectionUtils.newInstance(c, job); LOG.info("PostHook="+c.getName()); } c = job.getClass("stream.reduce.prehook", null, Reducer.class); if(c != null) { preReducer = (Reducer)ReflectionUtils.newInstance(c, job); oc = new InmemBufferingOutputCollector(); LOG.info("PreHook="+c.getName()); } this.ignoreKey = job.getBoolean("stream.reduce.ignoreKey", false); }
@Test public void testAddInputPathWithMapper() { final JobConf conf = new JobConf(); MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class, MapClass.class); MultipleInputs.addInputPath(conf, new Path("/bar"), KeyValueTextInputFormat.class, MapClass2.class); final Map<Path, InputFormat> inputs = MultipleInputs .getInputFormatMap(conf); final Map<Path, Class<? extends Mapper>> maps = MultipleInputs .getMapperTypeMap(conf); assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass()); assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar")) .getClass()); assertEquals(MapClass.class, maps.get(new Path("/foo"))); assertEquals(MapClass2.class, maps.get(new Path("/bar"))); }
private void runIOTest( Class<? extends Mapper<Text, LongWritable, Text, Text>> mapperClass, Path outputDir) throws IOException { JobConf job = new JobConf(config, CopyOfTestDFSIO.class); FileInputFormat.setInputPaths(job, getControlDir(config)); job.setInputFormat(SequenceFileInputFormat.class); job.setMapperClass(mapperClass); job.setReducerClass(CopyOfAccumulatingReducer.class); FileOutputFormat.setOutputPath(job, outputDir); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(1); JobClient.runJob(job); }
@SuppressWarnings("unchecked") public void map(K1 key, V1 value, OutputCollector<K2, V2> outputCollector, Reporter reporter) throws IOException { if (mapper == null) { // Find the Mapper from the TaggedInputSplit. TaggedInputSplit inputSplit = (TaggedInputSplit) reporter.getInputSplit(); mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit .getMapperClass(), conf); } mapper.map(key, value, outputCollector, reporter); }
/** * Creates a new TaggedInputSplit. * * @param inputSplit The InputSplit to be tagged * @param conf The configuration to use * @param inputFormatClass The InputFormat class to use for this job * @param mapperClass The Mapper class to use for this job */ public TaggedInputSplit(InputSplit inputSplit, Configuration conf, Class<? extends InputFormat> inputFormatClass, Class<? extends Mapper> mapperClass) { this.inputSplitClass = inputSplit.getClass(); this.inputSplit = inputSplit; this.conf = conf; this.inputFormatClass = inputFormatClass; this.mapperClass = mapperClass; }
@SuppressWarnings("unchecked") public void readFields(DataInput in) throws IOException { inputSplitClass = (Class<? extends InputSplit>) readClass(in); inputSplit = (InputSplit) ReflectionUtils .newInstance(inputSplitClass, conf); inputSplit.readFields(in); inputFormatClass = (Class<? extends InputFormat>) readClass(in); mapperClass = (Class<? extends Mapper>) readClass(in); }
/** * Chains the <code>map(...)</code> methods of the Mappers in the chain. */ @SuppressWarnings({"unchecked"}) public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException { Mapper mapper = chain.getFirstMap(); if (mapper != null) { mapper.map(key, value, chain.getMapperCollector(0, output, reporter), reporter); } }
/** * Add a {@link Path} with a custom {@link InputFormat} and * {@link Mapper} to the list of inputs for the map-reduce job. * * @param conf The configuration of the job * @param path {@link Path} to be added to the list of inputs for the job * @param inputFormatClass {@link InputFormat} class to use for this path * @param mapperClass {@link Mapper} class to use for this path */ public static void addInputPath(JobConf conf, Path path, Class<? extends InputFormat> inputFormatClass, Class<? extends Mapper> mapperClass) { addInputPath(conf, path, inputFormatClass); String mapperMapping = path.toString() + ";" + mapperClass.getName(); String mappers = conf.get("mapreduce.input.multipleinputs.dir.mappers"); conf.set("mapreduce.input.multipleinputs.dir.mappers", mappers == null ? mapperMapping : mappers + "," + mapperMapping); conf.setMapperClass(DelegatingMapper.class); }