public void testAddInputPathWithMapper() { final JobConf conf = new JobConf(); MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class, MapClass.class); MultipleInputs.addInputPath(conf, new Path("/bar"), KeyValueTextInputFormat.class, MapClass2.class); final Map<Path, InputFormat> inputs = MultipleInputs .getInputFormatMap(conf); final Map<Path, Class<? extends Mapper>> maps = MultipleInputs .getMapperTypeMap(conf); assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass()); assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar")) .getClass()); assertEquals(MapClass.class, maps.get(new Path("/foo"))); assertEquals(MapClass2.class, maps.get(new Path("/bar"))); }
@Test public void testAddInputPathWithMapper() { final JobConf conf = new JobConf(); MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class, MapClass.class); MultipleInputs.addInputPath(conf, new Path("/bar"), KeyValueTextInputFormat.class, MapClass2.class); final Map<Path, InputFormat> inputs = MultipleInputs .getInputFormatMap(conf); final Map<Path, Class<? extends Mapper>> maps = MultipleInputs .getMapperTypeMap(conf); assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass()); assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar")) .getClass()); assertEquals(MapClass.class, maps.get(new Path("/foo"))); assertEquals(MapClass2.class, maps.get(new Path("/bar"))); }
public void testAddInputPathWithFormat() { final JobConf conf = new JobConf(); MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class); MultipleInputs.addInputPath(conf, new Path("/bar"), KeyValueTextInputFormat.class); final Map<Path, InputFormat> inputs = MultipleInputs .getInputFormatMap(conf); assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass()); assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar")) .getClass()); }
@Test public void testCreateJob() throws IOException { JobConf job; ArrayList<String> dummyArgs = new ArrayList<String>(); dummyArgs.add("-input"); dummyArgs.add("dummy"); dummyArgs.add("-output"); dummyArgs.add("dummy"); dummyArgs.add("-mapper"); dummyArgs.add("dummy"); dummyArgs.add("-reducer"); dummyArgs.add("dummy"); ArrayList<String> args; args = new ArrayList<String>(dummyArgs); args.add("-inputformat"); args.add("org.apache.hadoop.mapred.KeyValueTextInputFormat"); job = StreamJob.createJob(args.toArray(new String[] {})); assertEquals(KeyValueTextInputFormat.class, job.getInputFormat().getClass()); args = new ArrayList<String>(dummyArgs); args.add("-inputformat"); args.add("org.apache.hadoop.mapred.SequenceFileInputFormat"); job = StreamJob.createJob(args.toArray(new String[] {})); assertEquals(SequenceFileInputFormat.class, job.getInputFormat().getClass()); args = new ArrayList<String>(dummyArgs); args.add("-inputformat"); args.add("org.apache.hadoop.mapred.KeyValueTextInputFormat"); args.add("-inputreader"); args.add("StreamXmlRecordReader,begin=<doc>,end=</doc>"); job = StreamJob.createJob(args.toArray(new String[] {})); assertEquals(StreamInputFormat.class, job.getInputFormat().getClass()); }
private InputSplit[] getSplits(JobConf conf, int numSplits, String path) throws Exception { FileInputFormat.setInputPaths(conf, new Path(path)); if (inputFormat == null) { inputFormat = inputFormatClass.newInstance(); String inputFormatClassName = inputFormatClass.getName(); if (inputFormatClassName.equals("org.apache.hadoop.mapred.TextInputFormat")) { ((TextInputFormat)inputFormat).configure(conf); } else if (inputFormatClassName.equals("org.apache.hadoop.mapred.KeyValueTextInputFormat")) { ((KeyValueTextInputFormat)inputFormat).configure(conf); } } return inputFormat.getSplits(conf, numSplits); // return null; }
@Test public void testAddInputPathWithFormat() { final JobConf conf = new JobConf(); MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class); MultipleInputs.addInputPath(conf, new Path("/bar"), KeyValueTextInputFormat.class); final Map<Path, InputFormat> inputs = MultipleInputs .getInputFormatMap(conf); assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass()); assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar")) .getClass()); }
public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf(), "map side join"); Configuration conf = job.getConfiguration(); job.setJarByClass(getClass()); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TupleWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TupleWritable.class); Class<? extends InputFormat> cls = null; job.setInputFormatClass(cls); // job.setInputFormatClass(CompositeInputFormat.class); // 导入路径设置为master和数据两种 TextInputFormat.addInputPaths(job, args[0]); TextInputFormat.addInputPaths(job, args[1]); conf.set(CompositeInputFormat.JOIN_EXPR, CompositeInputFormat.compose( "inner", KeyValueTextInputFormat.class, TextInputFormat.getInputPaths(job))); TextOutputFormat.setOutputPath(job, new Path(args[2])); job.setOutputFormatClass(TextOutputFormat.class); return job.waitForCompletion(true)?0:1; }