Java 类org.apache.hadoop.mapreduce.lib.join.CompositeInputFormat 实例源码

项目:hiped2    文件:CompositeJoin.java   
/**
 * The MapReduce driver - setup and launch the job.
 *
 * @param args the command-line arguments
 * @return the process exit code
 * @throws Exception if something goes wrong
 */
public int run(final String[] args) throws Exception {

  Cli cli = Cli.builder().setArgs(args).addOptions(Options.values()).build();
  int result = cli.runCmd();

  if (result != 0) {
    return result;
  }

  Path usersPath = new Path(cli.getArgValueAsString(Options.USERS));
  Path userLogsPath = new Path(cli.getArgValueAsString(Options.USER_LOGS));
  Path outputPath = new Path(cli.getArgValueAsString(Options.OUTPUT));

  Configuration conf = super.getConf();

  Job job = new Job(conf);

  job.setJarByClass(CompositeJoin.class);
  job.setMapperClass(JoinMap.class);

  job.setInputFormatClass(CompositeInputFormat.class);
  job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR,
      CompositeInputFormat.compose("inner",
          KeyValueTextInputFormat.class, usersPath, userLogsPath)
  );

  job.setNumReduceTasks(0);

  FileInputFormat.setInputPaths(job, userLogsPath);
  FileOutputFormat.setOutputPath(job, outputPath);

  return job.waitForCompletion(true) ? 0 : 1;
}