public int run(String[] args) throws Exception { Job job = Job.getInstance(getConf(), "map side join"); Configuration conf = job.getConfiguration(); job.setJarByClass(getClass()); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(TupleWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(TupleWritable.class); Class<? extends InputFormat> cls = null; job.setInputFormatClass(cls); // job.setInputFormatClass(CompositeInputFormat.class); // 导入路径设置为master和数据两种 TextInputFormat.addInputPaths(job, args[0]); TextInputFormat.addInputPaths(job, args[1]); conf.set(CompositeInputFormat.JOIN_EXPR, CompositeInputFormat.compose( "inner", KeyValueTextInputFormat.class, TextInputFormat.getInputPaths(job))); TextOutputFormat.setOutputPath(job, new Path(args[2])); job.setOutputFormatClass(TextOutputFormat.class); return job.waitForCompletion(true)?0:1; }
@Override public int run(String[] args) throws Exception { if (args.length != 4) { printUsage(); } Path userPath = new Path(args[0]); Path commentPath = new Path(args[1]); Path outputDir = new Path(args[2]); String joinType = args[3]; JobConf conf = new JobConf("CompositeJoin"); conf.setJarByClass(CompositeUserJoin.class); conf.setMapperClass(CompositeMapper.class); conf.setNumReduceTasks(0); // Set the input format class to a CompositeInputFormat class. // The CompositeInputFormat will parse all of our input files and output // records to our mapper. conf.setInputFormat(CompositeInputFormat.class); // The composite input format join expression will set how the records // are going to be read in, and in what input format. conf.set("mapred.join.expr", CompositeInputFormat.compose(joinType, KeyValueTextInputFormat.class, userPath, commentPath)); TextOutputFormat.setOutputPath(conf, outputDir); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); RunningJob job = JobClient.runJob(conf); while (!job.isComplete()) { Thread.sleep(1000); } return job.isSuccessful() ? 0 : 1; }
public static Configuration createMatrixMultiplicationCpuConf( Configuration initialConf, Path aPath, Path bPath, Path outPath, int outCardinality, boolean isDebugging) { JobConf conf = new JobConf(initialConf, MatrixMultiplicationCpu.class); conf.setJobName("MatrixMultiplicationCPU: " + aPath + " x " + bPath + " = " + outPath); conf.setInt(CONF_OUT_CARD, outCardinality); conf.setBoolean(CONF_DEBUG, isDebugging); conf.setInputFormat(CompositeInputFormat.class); conf.set("mapred.join.expr", CompositeInputFormat.compose("inner", SequenceFileInputFormat.class, aPath, bPath)); conf.setOutputFormat(SequenceFileOutputFormat.class); FileOutputFormat.setOutputPath(conf, outPath); conf.setMapperClass(MatrixMultiplyCpuMapper.class); conf.setCombinerClass(MatrixMultiplicationCpuReducer.class); conf.setReducerClass(MatrixMultiplicationCpuReducer.class); conf.setMapOutputKeyClass(IntWritable.class); conf.setMapOutputValueClass(VectorWritable.class); conf.setOutputKeyClass(IntWritable.class); conf.setOutputValueClass(VectorWritable.class); // Increase client heap size conf.set("mapred.child.java.opts", "-Xms8G -Xmx8G"); return conf; }
public static Configuration createMatrixMultiplicationGpuConf( Configuration initialConf, Path aPath, Path bPath, Path outPath, int outCardinality, int tileWidth, boolean isDebugging) { JobConf conf = new JobConf(initialConf, MatrixMultiplicationGpu.class); conf.setJobName("MatrixMultiplicationGPU: " + aPath + " x " + bPath + " = " + outPath); conf.setInt(CONF_OUT_CARD, outCardinality); conf.setInt(CONF_TILE_WIDTH, tileWidth); conf.setBoolean(CONF_DEBUG, isDebugging); conf.setInputFormat(CompositeInputFormat.class); conf.set("mapred.join.expr", CompositeInputFormat.compose("inner", SequenceFileInputFormat.class, aPath, bPath)); conf.setOutputFormat(SequenceFileOutputFormat.class); FileOutputFormat.setOutputPath(conf, outPath); conf.setMapperClass(MatrixMultiplyGpuMapper.class); conf.setMapOutputKeyClass(IntWritable.class); conf.setMapOutputValueClass(VectorWritable.class); conf.setOutputKeyClass(IntWritable.class); conf.setOutputValueClass(VectorWritable.class); // Increase client heap size for GPU Rootbeer execution conf.set("mapred.child.java.opts", "-Xms8G -Xmx8G"); // No Reduce step is needed // -> 0 reducer means reduce step will be skipped and // mapper output will be the final out // -> Identity reducer means then shuffling/sorting will still take place conf.setNumReduceTasks(0); return conf; }