Java 类org.apache.hadoop.mapreduce.lib.partition.InputSampler 实例源码

项目:ldbc_snb_datagen    文件:HadoopFileSorter.java   
/**
 * Sorts a hadoop sequence file
 *
 * @param inputFileName  The name of the file to sort.
 * @param outputFileName The name of the sorted file.
 * @throws Exception
 */
public void run(String inputFileName, String outputFileName) throws Exception {
    int numThreads = conf.getInt("ldbc.snb.datagen.generator.numThreads", 1);
    Job job = Job.getInstance(conf, "Sorting " + inputFileName);

    FileInputFormat.setInputPaths(job, new Path(inputFileName));
    FileOutputFormat.setOutputPath(job, new Path(outputFileName));

    job.setMapOutputKeyClass(K);
    job.setMapOutputValueClass(V);
    job.setOutputKeyClass(K);
    job.setOutputValueClass(V);
    job.setNumReduceTasks(numThreads);

    job.setJarByClass(V);
    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);

    InputSampler.Sampler sampler = new InputSampler.RandomSampler(0.1, 1000);
    TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(inputFileName + "_partition.lst"));
    InputSampler.writePartitionFile(job, sampler);
    job.setPartitionerClass(TotalOrderPartitioner.class);
    if (!job.waitForCompletion(true)) {
        throw new Exception();
    }
}
项目:TeraSort-Local-Hadoop-MR-Spark    文件:TeraSort.java   
@Override
public int run(String[] newargs) throws Exception {

    Job job = Job.getInstance(new Configuration());
    job.setJarByClass(TeraSort.class);

    // set mapper and reducer class
    job.setMapperClass(TeraMapper.class);
    job.setReducerClass(TeraReducer.class);

    // set number of reducers
    job.setNumReduceTasks(32);

    job.setInputFormatClass(KeyValueTextInputFormat.class);

    // set output of map class text as key
    job.setMapOutputKeyClass(Text.class);

    // set output of reducer as text class as key and value both are Text
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    // set input path for the job
    FileInputFormat.addInputPath(job, new Path(newargs[0]));

    Path partitionFile = new Path(new Path(newargs[2]), "partitioning");
    TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionFile);

    // use random sampler to write partitioner file
    InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.01, 1000,32);
    InputSampler.writePartitionFile(job, sampler);

    // set partitioner to TotalOrderPartitioner
    job.setPartitionerClass(TotalOrderPartitioner.class);

    // set output directory for the job
    FileOutputFormat.setOutputPath(job, new Path(newargs[1]));

    int ret = job.waitForCompletion(true) ? 0 : 1;
    logger.info("Done");
    return ret;

}
项目:hdt-mr    文件:HDTBuilderDriver.java   
protected boolean runDictionaryJob() throws ClassNotFoundException, IOException, InterruptedException, URISyntaxException {
    boolean jobOK;
    Job job = null;
    BufferedWriter bufferedWriter;

    // if output path exists...
    if (this.dictionaryFS.exists(this.conf.getDictionaryOutputPath())) {
        if (this.conf.getDeleteDictionaryOutputPath()) { // ... and option provided, delete recursively
            this.dictionaryFS.delete(this.conf.getDictionaryOutputPath(), true);
        } else { // ... and option not provided, fail
            System.out.println("Dictionary output path does exist: " + this.conf.getDictionaryOutputPath());
            System.out.println("Select other path or use option -dd to overwrite");
            System.exit(-1);
        }
    }

    // Sample the SequenceInputFormat to do TotalSort and create final output
    job = new Job(this.conf.getConfigurationObject(), this.conf.getDictionaryJobName() + " phase 2");

    job.setJarByClass(HDTBuilderDriver.class);

    System.out.println("samples = " + this.conf.getDictionarySamplesPath());
    System.out.println("output = " + this.conf.getDictionaryOutputPath());

    FileInputFormat.addInputPath(job, this.conf.getDictionarySamplesPath());
    FileOutputFormat.setOutputPath(job, this.conf.getDictionaryOutputPath());

    job.setInputFormatClass(SequenceFileInputFormat.class);
    LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);

    // Identity Mapper
    // job.setMapperClass(Mapper.class);
    job.setCombinerClass(DictionaryCombiner.class);
    job.setPartitionerClass(TotalOrderPartitioner.class);
    job.setReducerClass(DictionaryReducer.class);

    job.setNumReduceTasks(this.conf.getDictionaryReducers());

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);

    System.out.println("Sampling started");
    InputSampler.writePartitionFile(job, new InputSampler.IntervalSampler<Text, Text>(this.conf.getSampleProbability()));
    String partitionFile = TotalOrderPartitioner.getPartitionFile(job.getConfiguration());
    URI partitionUri = new URI(partitionFile + "#" + TotalOrderPartitioner.DEFAULT_PATH);
    DistributedCache.addCacheFile(partitionUri, job.getConfiguration());
    DistributedCache.createSymlink(job.getConfiguration());
    System.out.println("Sampling finished");

    MultipleOutputs.addNamedOutput(job, HDTBuilderConfiguration.SHARED, SequenceFileOutputFormat.class, Text.class, NullWritable.class);
    MultipleOutputs.addNamedOutput(job, HDTBuilderConfiguration.SUBJECTS, SequenceFileOutputFormat.class, Text.class, NullWritable.class);
    MultipleOutputs.addNamedOutput(job, HDTBuilderConfiguration.PREDICATES, SequenceFileOutputFormat.class, Text.class, NullWritable.class);
    MultipleOutputs.addNamedOutput(job, HDTBuilderConfiguration.OBJECTS, SequenceFileOutputFormat.class, Text.class, NullWritable.class);

    SequenceFileOutputFormat.setCompressOutput(job, true);
    SequenceFileOutputFormat.setOutputCompressorClass(job, com.hadoop.compression.lzo.LzoCodec.class);
    SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);

    jobOK = job.waitForCompletion(true);

    this.numShared = job.getCounters().findCounter(Counters.Shared).getValue();
    this.numSubjects = job.getCounters().findCounter(Counters.Subjects).getValue();
    this.numPredicates = job.getCounters().findCounter(Counters.Predicates).getValue();
    this.numObjects = job.getCounters().findCounter(Counters.Objects).getValue();

    bufferedWriter = new BufferedWriter(new OutputStreamWriter(this.dictionaryFS.create(this.conf.getDictionaryCountersFile())));

    bufferedWriter.write(HDTBuilderConfiguration.SHARED + "=" + this.numShared + "\n");
    bufferedWriter.write(HDTBuilderConfiguration.SUBJECTS + "=" + this.numSubjects + "\n");
    bufferedWriter.write(HDTBuilderConfiguration.PREDICATES + "=" + this.numPredicates + "\n");
    bufferedWriter.write(HDTBuilderConfiguration.OBJECTS + "=" + this.numObjects + "\n");

    bufferedWriter.close();

    return jobOK;
}
项目:hdt-mr    文件:HDTBuilderDriver.java   
protected boolean runTriplesJob() throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
    Job job = null;
    boolean jobOK;

    // if triples output path exists...
    if (this.triplesFS.exists(this.conf.getTriplesOutputPath())) {
        if (this.conf.getDeleteTriplesOutputPath()) { // ... and option provided, delete recursively
            this.triplesFS.delete(this.conf.getTriplesOutputPath(), true);
        } else { // ... and option not provided, fail
            System.out.println("Triples output path does exist: " + this.conf.getTriplesOutputPath());
            System.out.println("Select other path or use option -dt to overwrite");
            System.exit(-1);
        }
    }

    job = new Job(this.conf.getConfigurationObject(), this.conf.getTriplesJobName() + " phase 2");

    job.setJarByClass(HDTBuilderDriver.class);

    FileInputFormat.addInputPath(job, this.conf.getTriplesSamplesPath());
    FileOutputFormat.setOutputPath(job, this.conf.getTriplesOutputPath());

    job.setInputFormatClass(SequenceFileInputFormat.class);
    LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);

    job.setSortComparatorClass(TripleSPOComparator.class);
    job.setGroupingComparatorClass(TripleSPOComparator.class);

    job.setPartitionerClass(TotalOrderPartitioner.class);

    job.setOutputKeyClass(TripleSPOWritable.class);
    job.setOutputValueClass(NullWritable.class);

    job.setNumReduceTasks(this.conf.getTriplesReducers());

    System.out.println("Sampling started");
    InputSampler.writePartitionFile(job, new InputSampler.IntervalSampler<Text, Text>(this.conf.getSampleProbability()));
    String partitionFile = TotalOrderPartitioner.getPartitionFile(job.getConfiguration());
    URI partitionUri = new URI(partitionFile + "#" + TotalOrderPartitioner.DEFAULT_PATH);
    DistributedCache.addCacheFile(partitionUri, job.getConfiguration());
    DistributedCache.createSymlink(job.getConfiguration());
    System.out.println("Sampling finished");

    SequenceFileOutputFormat.setCompressOutput(job, true);
    SequenceFileOutputFormat.setOutputCompressorClass(job, com.hadoop.compression.lzo.LzoCodec.class);
    SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);

    jobOK = job.waitForCompletion(true);

    return jobOK;
}
项目:tree-index    文件:SortDriver.java   
public int run(String[] args) throws Exception {
    SortConfig config = new SortConfig();
    config.fromArray(args);

    Job job = Job.getInstance(getConf());
    job.setJobName("sort");
    job.setJarByClass(SortDriver.class);

    // define the path
    Path inputPath = new Path(config.getInput());
    Path partitionFilePath = new Path(config.getPartition());
    Path outputPath = new Path(config.getOutput());
    Path metaPath = new Path(config.getMeta());
    LOGGER.info("use " + inputPath.toString() + " as sort input");
    LOGGER.info("use " + partitionFilePath.toString() + " as partition");
    LOGGER.info("use " + outputPath.toString() + " as sort output");
    LOGGER.info("use " + metaPath.toString() + " as meta output");

    // define the mapper
    // use the identity mapper, which is the default implementation
    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(Text.class);
    job.setInputFormatClass(SequenceFileInputFormat.class);
    SequenceFileInputFormat.setInputPaths(job, inputPath);

    // define the reducer
    job.getConfiguration().set(SortReducer.META_BASE_CONFIG_NAME, metaPath.toString());
    job.setReducerClass(SortReducer.class);
    job.setNumReduceTasks(NUM_REDUCER);
    // use text for debug, use sequence is faster I guess
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);
    TextOutputFormat.setOutputPath(job, outputPath);

    // set partition
    job.setPartitionerClass(TotalOrderPartitioner.class);
    TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionFilePath);

    // set the sampler
    InputSampler.writePartitionFile(job, new InputSampler.RandomSampler(
            1, 10000));

    // set multiple output
    MultipleOutputs.addNamedOutput(job, "meta", TextOutputFormat.class,
            IntWritable.class, Text.class);

    // clean up the old output path
    outputPath.getFileSystem(job.getConfiguration()).delete(outputPath, true);
    metaPath.getFileSystem(job.getConfiguration()).delete(metaPath, true);

    // run the job and wait until it complete
    return job.waitForCompletion(true) ? 0 : 1;
}
项目:hiped2    文件:TotalSortMapReduce.java   
/**
 * The MapReduce driver - setup and launch the job.
 *
 * @param args the command-line arguments
 * @return the process exit code
 * @throws Exception if something goes wrong
 */
public int run(final String[] args) throws Exception {

  int numReducers = 2;

  Cli cli = Cli.builder().setArgs(args).addOptions(CliOpts.values()).build();
  int result = cli.runCmd();

  if (result != 0) {
    return result;
  }

  Path input = new Path(cli.getArgValueAsString(CliOpts.INPUT));
  Path partitionFile = new Path(cli.getArgValueAsString(CliOpts.PARTITION));
  Path output = new Path(cli.getArgValueAsString(CliOpts.OUTPUT));


  InputSampler.Sampler<Text, Text> sampler =
      new InputSampler.RandomSampler<Text, Text>
          (0.1,
              10000,
              10);

  Configuration conf = super.getConf();

  Job job = new Job(conf);
  job.setJarByClass(TotalSortMapReduce.class);

  job.setNumReduceTasks(numReducers);

  job.setInputFormatClass(KeyValueTextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  job.setPartitionerClass(TotalOrderPartitioner.class);

  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(Text.class);

  TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionFile);
  FileInputFormat.setInputPaths(job, input);
  FileOutputFormat.setOutputPath(job, output);

  InputSampler.writePartitionFile(job, sampler);

  URI partitionUri = new URI(partitionFile.toString() +
      "#" + "_sortPartitioning");
  DistributedCache.addCacheFile(partitionUri, conf);

  if (job.waitForCompletion(true)) {
    return 0;
  }
  return 1;
}
项目:ldbc_snb_datagen    文件:HadoopFileRanker.java   
/**
 * Sorts a hadoop sequence file
 *
 * @param inputFileName  The name of the file to sort.
 * @param outputFileName The name of the sorted file.
 * @throws Exception
 */
public void run(String inputFileName, String outputFileName) throws Exception {
    int numThreads = conf.getInt("ldbc.snb.datagen.generator.numThreads", 1);

    if (keySetterName != null) {
        conf.set("keySetterClassName", keySetterName);
    }

    /** First Job to sort the key-value pairs and to count the number of elements processed by each reducer.**/
    Job jobSort = Job.getInstance(conf, "Sorting " + inputFileName);

    FileInputFormat.setInputPaths(jobSort, new Path(inputFileName));
    FileOutputFormat
            .setOutputPath(jobSort, new Path(conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/rankIntermediate"));

    if (keySetterName != null) {
        jobSort.setMapperClass(HadoopFileRankerSortMapper.class);
    }
    jobSort.setMapOutputKeyClass(K);
    jobSort.setMapOutputValueClass(V);
    jobSort.setOutputKeyClass(BlockKey.class);
    jobSort.setOutputValueClass(V);
    jobSort.setNumReduceTasks(numThreads);
    jobSort.setReducerClass(HadoopFileRankerSortReducer.class);
    jobSort.setJarByClass(V);
    jobSort.setInputFormatClass(SequenceFileInputFormat.class);
    jobSort.setOutputFormatClass(SequenceFileOutputFormat.class);
    InputSampler.Sampler sampler = new InputSampler.RandomSampler(0.1, 1000);
    TotalOrderPartitioner.setPartitionFile(jobSort.getConfiguration(), new Path(inputFileName + "_partition.lst"));
    InputSampler.writePartitionFile(jobSort, sampler);
    jobSort.setPartitionerClass(TotalOrderPartitioner.class);
    if (!jobSort.waitForCompletion(true)) {
        throw new Exception();
    }

    /** Second Job to assign the rank to each element.**/
    Job jobRank = Job.getInstance(conf, "Sorting " + inputFileName);
    FileInputFormat
            .setInputPaths(jobRank, new Path(conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/rankIntermediate"));
    FileOutputFormat.setOutputPath(jobRank, new Path(outputFileName));

    jobRank.setMapOutputKeyClass(BlockKey.class);
    jobRank.setMapOutputValueClass(V);
    jobRank.setOutputKeyClass(LongWritable.class);
    jobRank.setOutputValueClass(V);
    jobRank.setSortComparatorClass(BlockKeyComparator.class);
    jobRank.setNumReduceTasks(numThreads);
    jobRank.setReducerClass(HadoopFileRankerFinalReducer.class);
    jobRank.setJarByClass(V);
    jobRank.setInputFormatClass(SequenceFileInputFormat.class);
    jobRank.setOutputFormatClass(SequenceFileOutputFormat.class);
    jobRank.setPartitionerClass(HadoopFileRankerPartitioner.class);
    if (!jobRank.waitForCompletion(true)) {
        throw new Exception();
    }

    try {
        FileSystem fs = FileSystem.get(conf);
        for (int i = 0; i < numThreads; ++i) {
            fs.delete(new Path(conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/rank_" + i), true);
        }
        fs.delete(new Path(conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/rankIntermediate"), true);
    } catch (IOException e) {
        System.err.println(e.getMessage());
    }
}
项目:hbase-in-action    文件:BulkImportJobExample.java   
/**
 * Fixed a potential overlap of generated regions / splits for a dataset with lots of identical keys. For instance,
 * let your samples be: {1,1,1 ,1,3,3, 3,5,6} and your number of partitions be 3. Original implementation will get you
 * following splits, 1-1, 3-3, 3-6, notice the overlap between 2nd and 3rd partition.
 *
 * @param job
 * @param sampler
 * @param <K>
 * @param <V>
 * @throws IOException
 * @throws ClassNotFoundException
 * @throws InterruptedException
 */
@SuppressWarnings("unchecked")
public static <K, V> void writePartitionFile(Job job, InputSampler.Sampler<K, V> sampler)
  throws IOException, ClassNotFoundException, InterruptedException {
  LinkedList<K> splits = new LinkedList<K>();
  Configuration conf = job.getConfiguration();
  final InputFormat inf =
    ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
  int numPartitions = job.getNumReduceTasks();
  K[] samples = null; //sampler.getSample(inf, job);
  LOG.info("Using " + samples.length + " samples");
  RawComparator<K> comparator = (RawComparator<K>) job.getGroupingComparator();
  Arrays.sort(samples, comparator);
  Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf));
  FileSystem fs = dst.getFileSystem(conf);
  if (fs.exists(dst)) fs.delete(dst, false);
  SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, dst, job.getMapOutputKeyClass(), NullWritable.class);
  NullWritable nullValue = NullWritable.get();
  float stepSize = samples.length / (float) numPartitions;

  K lastKey = null;
  K currentKey = null;
  int lastKeyIndex = -1;
  for (int i = 1; i < numPartitions; ++i) {
    int currentKeyOffset = Math.round(stepSize * i);
    if (lastKeyIndex > currentKeyOffset) {
      long keyOffset = lastKeyIndex - currentKeyOffset;
      float errorRate = keyOffset / samples.length;
      LOG.warn(
        String.format("Partitions overlap. Consider using a different Sampler " +
          "and/or increase the number of samples and/or use more splits to take samples from. " +
          "Next sample would have been %s key overlaps by a distance of %d (factor %f) ", samples[currentKeyOffset], keyOffset, errorRate));
      currentKeyOffset = lastKeyIndex + 1;
    }
    currentKey = samples[currentKeyOffset];

    while (lastKey != null && comparator.compare(currentKey, lastKey) == 0) {
      currentKeyOffset++;
      if (currentKeyOffset >= samples.length) {
        LOG.info("Last 10 elements:");

        for (int d = samples.length - 1; d > samples.length - 11; d--) {
          LOG.debug(samples[d]);
        }
        throw new IOException("Not enough samples, stopped at partition " + i);
      }
      currentKey = samples[currentKeyOffset];
    }

    writer.append(currentKey, nullValue);
    lastKey = currentKey;
    lastKeyIndex = currentKeyOffset;
    splits.add(currentKey);
  }
  writer.close();
  LOG.info("*********************************************  ");
  LOG.info(" START KEYs for new Regions:  ");
  for (K split : splits) {
    LOG.info("* " + split.toString());
  }

}
项目:ldbc_snb_datagen_deprecated2015    文件:HadoopFileRanker.java   
/** Sorts a hadoop sequence file
 *
 * @param inputFileName The name of the file to sort.
 * @param outputFileName The name of the sorted file.
 * @throws Exception
 */
public void run( String inputFileName, String outputFileName ) throws Exception {
    int numThreads = conf.getInt("numThreads",1);

    /** First Job to sort the key-value pairs and to count the number of elements processed by each reducer.**/
    Job jobSort = new Job(conf, "Sorting "+inputFileName);

    FileInputFormat.setInputPaths(jobSort, new Path(inputFileName));
    FileOutputFormat.setOutputPath(jobSort, new Path(conf.get("outputDir")+"/hadoop/rankIntermediate"));

    jobSort.setMapOutputKeyClass(K);
    jobSort.setMapOutputValueClass(V);
    jobSort.setOutputKeyClass(ComposedKey.class);
    jobSort.setOutputValueClass(V);
    jobSort.setNumReduceTasks(numThreads);
    jobSort.setReducerClass(HadoopFileRankerSortReducer.class);
    jobSort.setJarByClass(V);
    jobSort.setInputFormatClass(SequenceFileInputFormat.class);
    jobSort.setOutputFormatClass(SequenceFileOutputFormat.class);
    InputSampler.Sampler sampler = new InputSampler.RandomSampler(0.1, 1000);
    TotalOrderPartitioner.setPartitionFile(jobSort.getConfiguration(), new Path(inputFileName + "_partition.lst"));
    InputSampler.writePartitionFile(jobSort, sampler);
    jobSort.setPartitionerClass(TotalOrderPartitioner.class);
    jobSort.waitForCompletion(true);

    /** Second Job to assign the rank to each element.**/
    Job jobRank = new Job(conf, "Sorting "+inputFileName);
    FileInputFormat.setInputPaths(jobRank, new Path(conf.get("outputDir")+"/hadoop/rankIntermediate"));
    FileOutputFormat.setOutputPath(jobRank, new Path(outputFileName));

    jobRank.setMapOutputKeyClass(ComposedKey.class);
    jobRank.setMapOutputValueClass(V);
    jobRank.setOutputKeyClass(LongWritable.class);
    jobRank.setOutputValueClass(V);
    jobRank.setSortComparatorClass(ComposedKeyComparator.class);
    jobRank.setNumReduceTasks(numThreads);
    jobRank.setReducerClass(HadoopFileRankerFinalReducer.class);
    jobRank.setJarByClass(V);
    jobRank.setInputFormatClass(SequenceFileInputFormat.class);
    jobRank.setOutputFormatClass(SequenceFileOutputFormat.class);
    jobRank.setPartitionerClass(HadoopFileRankerPartitioner.class);
    jobRank.waitForCompletion(true);

    try{
        FileSystem fs = FileSystem.get(conf);
        for(int i = 0; i < numThreads;++i ) {
            fs.delete(new Path(conf.get("outputDir")+"/hadoop/rank_"+i),true);
        }
        fs.delete(new Path(conf.get("outputDir")+"/hadoop/rankIntermediate"),true);
    } catch(IOException e) {
        System.err.println(e.getMessage());
    }
}
项目:hadoop-map-reduce-patterns    文件:TotalOrderSortingStage.java   
@SuppressWarnings("unchecked")
@Override
public int run(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Path inputPath = new Path(args[0]);
    Path partitionFile = new Path(args[1] + "_partitions.lst");
    Path outputStage = new Path(args[1] + "_staging");
    Path outputOrder = new Path(args[1]);
    // Configure job to prepare for sampling
    Job sampleJob = new Job(conf, "TotalOrderSortingStage");
    sampleJob.setJarByClass(TotalOrderSortingStage.class);
    // Use the mapper implementation with zero reduce tasks
    sampleJob.setMapperClass(LastAccessMapper.class);
    sampleJob.setNumReduceTasks(0);
    sampleJob.setOutputKeyClass(Text.class);
    sampleJob.setOutputValueClass(Text.class);
    TextInputFormat.setInputPaths(sampleJob, inputPath);
    // Set the output format to a sequence file
    sampleJob.setOutputFormatClass(SequenceFileOutputFormat.class);
    SequenceFileOutputFormat.setOutputPath(sampleJob, outputStage);
    // Submit the job and get completion code.
    int code = sampleJob.waitForCompletion(true) ? 0 : 1;
    if (code == 0) {
        Job orderJob = new Job(conf, "TotalOrderSortingStage");
        orderJob.setJarByClass(TotalOrderSortingStage.class);
        // Here, use the identity mapper to output the key/value pairs in
        // the SequenceFile
        orderJob.setMapperClass(Mapper.class);
        orderJob.setReducerClass(ValuesReducer.class);
        // Set the number of reduce tasks to an appropriate number for the
        // amount of data being sorted
        orderJob.setNumReduceTasks(10);
        // Use Hadoop's TotalOrderPartitioner class
        orderJob.setPartitionerClass(TotalOrderPartitioner.class);
        // Set the partition file
        TotalOrderPartitioner.setPartitionFile(orderJob.getConfiguration(),
                partitionFile);
        orderJob.setOutputKeyClass(Text.class);
        orderJob.setOutputValueClass(Text.class);
        // Set the input to the previous job's output
        orderJob.setInputFormatClass(SequenceFileInputFormat.class);
        SequenceFileInputFormat.setInputPaths(orderJob, outputStage);
        // Set the output path to the command line parameter
        TextOutputFormat.setOutputPath(orderJob, outputOrder);
        // Set the separator to an empty string
        orderJob.getConfiguration().set(
                "mapred.textoutputformat.separator", "");
        // Use the InputSampler to go through the output of the previous
        // job, sample it, and create the partition file
        InputSampler.writePartitionFile(orderJob,
                new InputSampler.RandomSampler(.001, 10000));
        // Submit the job
        code = orderJob.waitForCompletion(true) ? 0 : 2;
    }
    // Clean up the partition file and the staging directory
    FileSystem.get(new Configuration()).delete(partitionFile, false);
    FileSystem.get(new Configuration()).delete(outputStage, true);
    return code;
}