Java 类org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer 实例源码

项目:cloud-bigtable-client    文件:Import.java   
/**
 * Sets up the actual job.
 * @param conf The current configuration.
 * @param args The command line parameters.
 * @return The newly created job.
 * @throws IOException When setting up the job fails.
 */
public static Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
  TableName tableName = TableName.valueOf(args[0]);
  conf.set(TABLE_NAME, tableName.getNameAsString());
  Path inputDir = new Path(args[1]);
  Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
  job.setJarByClass(Importer.class);
  FileInputFormat.setInputPaths(job, inputDir);
  job.setInputFormatClass(SequenceFileInputFormat.class);
  String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY);

  // make sure we get the filter in the jars
  try {
    Class<? extends Filter> filter = conf.getClass(FILTER_CLASS_CONF_KEY, null, Filter.class);
    if (filter != null) {
      TableMapReduceUtil.addDependencyJars(conf, filter);
    }
  } catch (Exception e) {
    throw new IOException(e);
  }

  if (hfileOutPath != null) {
    job.setMapperClass(KeyValueImporter.class);
    try (Connection conn = ConnectionFactory.createConnection(conf); 
        Table table = conn.getTable(tableName);
        RegionLocator regionLocator = conn.getRegionLocator(tableName)){
      job.setReducerClass(KeyValueSortReducer.class);
      Path outputDir = new Path(hfileOutPath);
      FileOutputFormat.setOutputPath(job, outputDir);
      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
      job.setMapOutputValueClass(KeyValue.class);
      HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);
      TableMapReduceUtil.addDependencyJars(job.getConfiguration(),
          com.google.common.base.Preconditions.class);
    }
  } else {
    // No reducers.  Just write straight to table.  Call initTableReducerJob
    // because it sets up the TableOutputFormat.
    job.setMapperClass(Importer.class);
    TableMapReduceUtil.initTableReducerJob(tableName.getNameAsString(), null, job);
    job.setNumReduceTasks(0);
  }
  return job;
}
项目:kylin    文件:HFileOutputFormat3.java   
static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor,
                                     RegionLocator regionLocator, Class<? extends OutputFormat<?, ?>> cls) throws IOException,
        UnsupportedEncodingException {
    Configuration conf = job.getConfiguration();
    job.setOutputKeyClass(ImmutableBytesWritable.class);
    job.setOutputValueClass(KeyValue.class);
    job.setOutputFormatClass(cls);

    // Based on the configured map output class, set the correct reducer to properly
    // sort the incoming values.
    // TODO it would be nice to pick one or the other of these formats.
    if (KeyValue.class.equals(job.getMapOutputValueClass())) {
        job.setReducerClass(KeyValueSortReducer.class);
    } else if (Put.class.equals(job.getMapOutputValueClass())) {
        job.setReducerClass(PutSortReducer.class);
    } else if (Text.class.equals(job.getMapOutputValueClass())) {
        job.setReducerClass(TextSortReducer.class);
    } else {
        LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
    }

    conf.setStrings("io.serializations", conf.get("io.serializations"),
            MutationSerialization.class.getName(), ResultSerialization.class.getName(),
            KeyValueSerialization.class.getName());

    // Use table's region boundaries for TOP split points.
    LOG.info("Looking up current regions for table " + tableDescriptor.getTableName());
    List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator);
    LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
            "to match current region count");
    job.setNumReduceTasks(startKeys.size());

    configurePartitioner(job, startKeys);
    // Set compression algorithms based on column families
    configureCompression(conf, tableDescriptor);
    configureBloomType(tableDescriptor, conf);
    configureBlockSize(tableDescriptor, conf);
    configureDataBlockEncoding(tableDescriptor, conf);

    TableMapReduceUtil.addDependencyJars(job);
    TableMapReduceUtil.initCredentials(job);
    LOG.info("Incremental table " + regionLocator.getName() + " output configured.");
}
项目:Kylin    文件:CubeHFileJob.java   
public int run(String[] args) throws Exception {
    Options options = new Options();

    try {
        options.addOption(OPTION_JOB_NAME);
        options.addOption(OPTION_CUBE_NAME);
        options.addOption(OPTION_INPUT_PATH);
        options.addOption(OPTION_OUTPUT_PATH);
        options.addOption(OPTION_HTABLE_NAME);
        parseOptions(options, args);

        Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
        String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();

        CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());

        CubeInstance cube = cubeMgr.getCube(cubeName);
        job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));

        setJobClasspath(job);

        addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
        FileOutputFormat.setOutputPath(job, output);

        job.setInputFormatClass(SequenceFileInputFormat.class);
        job.setMapperClass(CubeHFileMapper.class);
        job.setReducerClass(KeyValueSortReducer.class);

        // set job configuration
        job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
        Configuration conf = HBaseConfiguration.create(getConf());
        // add metadata to distributed cache
        attachKylinPropsAndMetadata(cube, job.getConfiguration());

        String tableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase();
        HTable htable = new HTable(conf, tableName);

        //Automatic config !
        HFileOutputFormat.configureIncrementalLoad(job, htable);

        // set block replication to 3 for hfiles
        conf.set(DFSConfigKeys.DFS_REPLICATION_KEY, "3");

        this.deletePath(job.getConfiguration(), output);

        return waitForCompletion(job);
    } catch (Exception e) {
        logger.error("error in CubeHFileJob", e);
        printUsage(options);
        throw e;
    }
}