public void run() throws Exception { String tableName = "contacts"; Configuration config = HBaseConfiguration.create(); Scan scan = new Scan(); scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs scan.setCacheBlocks(false); // don't set to true for MR jobs config.set(TableInputFormat.SCAN, convertScanToString(scan)); config.set(TableInputFormat.INPUT_TABLE, tableName); Job job = new Job(config, "index builder"); job.setJarByClass(JobSubmitter.class); job.setMapperClass(IndexMapper.class); job.setNumReduceTasks(0); job.setInputFormatClass(TableInputFormat.class); job.setOutputFormatClass(MultiTableOutputFormat.class); boolean b = job.waitForCompletion(true); if (!b) { throw new IOException("error with job!"); } }
/** * @param conf to use to create and run the job * @param scan to be used to scan the raw table. * @param totalJobCount the total number of jobs that need to be run in this * batch. Used in job name. * @return The job to be submitted to the cluster. * @throws IOException * @throws InterruptedException * @throws ClassNotFoundException */ private Job getProcessingJob(Configuration conf, Scan scan, int totalJobCount) throws IOException { Configuration confClone = new Configuration(conf); // Turn off speculative execution. // Note: must be BEFORE the job construction with the new mapreduce API. confClone.setBoolean("mapred.map.tasks.speculative.execution", false); // Set up job Job job = new Job(confClone, getJobName(totalJobCount)); // This is a map-only class, skip reduce step job.setNumReduceTasks(0); job.setJarByClass(JobFileProcessor.class); job.setOutputFormatClass(MultiTableOutputFormat.class); TableMapReduceUtil.initTableMapperJob(Constants.HISTORY_RAW_TABLE, scan, JobFileTableMapper.class, JobFileTableMapper.getOutputKeyClass(), JobFileTableMapper.getOutputValueClass(), job); return job; }
public static void main(String[] args) throws Exception { String rootDir = "hdfs://hadoop1:8020/hbase"; String zkServer = "hadoop1"; String port = "2181"; TwoLevelIndexBuilder conn = new TwoLevelIndexBuilder(rootDir, zkServer, port); Configuration conf = conn.conf; String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); //TwoLevelIndexBuilder: TableName, ColumnFamily, Qualifier if (otherArgs.length < 3) { System.exit(-1); } //表名 String tableName = otherArgs[0]; //列族 String columnFamily = otherArgs[1]; conf.set("tableName", tableName); conf.set("columnFamily", columnFamily); //列 (可能存在多个列) String[] qualifiers = new String[otherArgs.length - 2]; System.arraycopy(otherArgs, 2, qualifiers, 0, qualifiers.length); //设置列 conf.setStrings("qualifiers", qualifiers); Job job = new Job(conf, tableName); job.setJarByClass(TwoLevelIndexBuilder.class); job.setMapperClass(TowLevelIndexMapper.class); job.setNumReduceTasks(0); //由于不需要执行 reduce阶段 job.setInputFormatClass(TableInputFormat.class); job.setOutputFormatClass(MultiTableOutputFormat.class); TableMapReduceUtil.initTableMapperJob(tableName, new Scan(), TowLevelIndexMapper.class, ImmutableBytesWritable.class, Put.class, job); job.waitForCompletion(true); }