public Job configureJob(Configuration conf, String outputTable) throws IOException { conf.setInt("mapreduce.map.failures.maxpercent", 5); conf.setInt("mapred.max.map.failures.percent", 5); conf.setInt("mapred.max.tracker.failures", 20); Job job = Job.getInstance(conf); job.setJarByClass(CouchbaseViewToHBaseImporter.class); // Input job.setInputFormatClass(CouchbaseViewInputFormat.class); // Mapper job.setMapperClass(CouchbaseViewToHBaseMapper.class); // Reducer job.setNumReduceTasks(0); // Output TableMapReduceUtil.initTableReducerJob(outputTable, IdentityTableReducer.class, job); return job; }
/** * Main entry point. * * @param args The command line parameters. * @throws Exception When running the job fails. */ // vv ParseJson public static void main(String[] args) throws Exception { /*...*/ // ^^ ParseJson Configuration conf = HBaseConfiguration.create(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); CommandLine cmd = parseArgs(otherArgs); // check debug flag and other options if (cmd.hasOption("d")) conf.set("conf.debug", "true"); // get details String input = cmd.getOptionValue("i"); String output = cmd.getOptionValue("o"); String column = cmd.getOptionValue("c"); // vv ParseJson Scan scan = new Scan(); if (column != null) { byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column)); if (colkey.length > 1) { scan.addColumn(colkey[0], colkey[1]); conf.set("conf.columnfamily", Bytes.toStringBinary(colkey[0])); // co ParseJson-2-Conf Store the column family in the configuration for later use in the mapper. conf.set("conf.columnqualifier", Bytes.toStringBinary(colkey[1])); } else { scan.addFamily(colkey[0]); conf.set("conf.columnfamily", Bytes.toStringBinary(colkey[0])); } } Job job = Job.getInstance(conf, "Parse data in " + input + ", write to " + output); job.setJarByClass(ParseJson.class); TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class, // co ParseJson-3-SetMap Setup map phase details using the utility method. ImmutableBytesWritable.class, Put.class, job); TableMapReduceUtil.initTableReducerJob(output, // co ParseJson-4-SetReduce Configure an identity reducer to store the parsed data. IdentityTableReducer.class, job); System.exit(job.waitForCompletion(true) ? 0 : 1); }
/** * Main entry point. * * @param args The command line parameters. * @throws Exception When running the job fails. */ public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); CommandLine cmd = parseArgs(otherArgs); // check debug flag and other options if (cmd.hasOption("d")) conf.set("conf.debug", "true"); // get details String input = cmd.getOptionValue("i"); String output = cmd.getOptionValue("o"); String column = cmd.getOptionValue("c"); Scan scan = new Scan(); if (column != null) { byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column)); if (colkey.length > 1) { scan.addColumn(colkey[0], colkey[1]); conf.set("conf.columnfamily", Bytes.toStringBinary(colkey[0])); // co ParseJson2-2-Conf Store the column family in the configuration for later use in the mapper. conf.set("conf.columnqualifier", Bytes.toStringBinary(colkey[1])); } else { scan.addFamily(colkey[0]); conf.set("conf.columnfamily", Bytes.toStringBinary(colkey[0])); } } // vv ParseJson2 /*...*/ Job job = Job.getInstance(conf, "Parse data in " + input + ", write to " + output + "(map only)"); job.setJarByClass(ParseJson2.class); TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class, ImmutableBytesWritable.class, Put.class, job); TableMapReduceUtil.initTableReducerJob(output, IdentityTableReducer.class, job); /*[*/job.setNumReduceTasks(0);/*]*/ /*...*/ // ^^ ParseJson2 System.exit(job.waitForCompletion(true) ? 0 : 1); }
/** * The MapReduce driver - setup and launch the job. * * @param args the command-line arguments * @return the process exit code * @throws Exception if something goes wrong */ public int run(final String[] args) throws Exception { Cli cli = Cli.builder().setArgs(args).addOptions(CliCommonOpts.MrIoOpts.values()).build(); int result = cli.runCmd(); if (result != 0) { return result; } Path inputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.INPUT)); Path outputPath = new Path(cli.getArgValueAsString(CliCommonOpts.MrIoOpts.OUTPUT)); Configuration conf = super.getConf(); HBaseWriter.createTableAndColumn(conf, STOCKS_IMPORT_TABLE_NAME, HBaseWriter.STOCK_DETAILS_COLUMN_FAMILY_AS_BYTES); Job job = new Job(conf); job.setJarByClass(HBaseSinkMapReduce.class); TableMapReduceUtil.initTableReducerJob( STOCKS_IMPORT_TABLE_NAME, IdentityTableReducer.class, job); job.setMapperClass(MapClass.class); job.setMapOutputKeyClass(StockPriceWritable.class); job.setMapOutputValueClass(Put.class); FileInputFormat.setInputPaths(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); outputPath.getFileSystem(conf).delete(outputPath, true); if (job.waitForCompletion(true)) { return 0; } return 1; }