/** * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against * <code>splitPoints</code>. Cleans up the partitions file after job exists. */ static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints) throws IOException { Configuration conf = job.getConfiguration(); // create the partitions file FileSystem fs = FileSystem.get(conf); String hbaseTmpFsDir = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()); fs.makeQualified(partitionsPath); writePartitions(conf, partitionsPath, splitPoints); fs.deleteOnExit(partitionsPath); // configure job to use it job.setPartitionerClass(TotalOrderPartitioner.class); TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); }
/** * Sorts a hadoop sequence file * * @param inputFileName The name of the file to sort. * @param outputFileName The name of the sorted file. * @throws Exception */ public void run(String inputFileName, String outputFileName) throws Exception { int numThreads = conf.getInt("ldbc.snb.datagen.generator.numThreads", 1); Job job = Job.getInstance(conf, "Sorting " + inputFileName); FileInputFormat.setInputPaths(job, new Path(inputFileName)); FileOutputFormat.setOutputPath(job, new Path(outputFileName)); job.setMapOutputKeyClass(K); job.setMapOutputValueClass(V); job.setOutputKeyClass(K); job.setOutputValueClass(V); job.setNumReduceTasks(numThreads); job.setJarByClass(V); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); InputSampler.Sampler sampler = new InputSampler.RandomSampler(0.1, 1000); TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(inputFileName + "_partition.lst")); InputSampler.writePartitionFile(job, sampler); job.setPartitionerClass(TotalOrderPartitioner.class); if (!job.waitForCompletion(true)) { throw new Exception(); } }
/** * Check if there's partition files for hfile, if yes replace the table splits, to make the job more reducers * @param conf the job configuration * @param path the hfile partition file * @throws IOException */ @SuppressWarnings("deprecation") private void reconfigurePartitions(Configuration conf, Path path) throws IOException { FileSystem fs = path.getFileSystem(conf); if (fs.exists(path)) { try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf)) { int partitionCount = 0; Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); while (reader.next(key, value)) { partitionCount++; } TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), path); // The reduce tasks should be one more than partition keys job.setNumReduceTasks(partitionCount + 1); } } else { logger.info("File '" + path.toString() + " doesn't exist, will not reconfigure hfile Partitions"); } }
public Job createSubmittableJob(String[] args) throws IOException { Path partitionsPath = new Path(destPath, PARTITIONS_FILE_NAME); generatePartitions(partitionsPath); Job job = Job.getInstance(getConf(), getConf().get("mapreduce.job.name", "hashTable_" + tableHash.tableName)); Configuration jobConf = job.getConfiguration(); jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize); job.setJarByClass(HashTable.class); TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(), HashMapper.class, ImmutableBytesWritable.class, ImmutableBytesWritable.class, job); // use a TotalOrderPartitioner and reducers to group region output into hash files job.setPartitionerClass(TotalOrderPartitioner.class); TotalOrderPartitioner.setPartitionFile(jobConf, partitionsPath); job.setReducerClass(Reducer.class); // identity reducer job.setNumReduceTasks(tableHash.numHashFiles); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(ImmutableBytesWritable.class); job.setOutputFormatClass(MapFileOutputFormat.class); FileOutputFormat.setOutputPath(job, new Path(destPath, HASH_DATA_DIR)); return job; }
/** * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against * <code>splitPoints</code>. Cleans up the partitions file after job exists. */ static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints, boolean writeMultipleTables) throws IOException { Configuration conf = job.getConfiguration(); // create the partitions file FileSystem fs = FileSystem.get(conf); String hbaseTmpFsDir = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); Path partitionsPath = new Path(hbaseTmpFsDir, "partitions_" + UUID.randomUUID()); fs.makeQualified(partitionsPath); writePartitions(conf, partitionsPath, splitPoints, writeMultipleTables); fs.deleteOnExit(partitionsPath); // configure job to use it job.setPartitionerClass(TotalOrderPartitioner.class); TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); }
/** * Confirm the absence of the {@link TotalOrderPartitioner} partitions file. */ protected static void validateDeletedPartitionsFile(Configuration conf) throws IOException { if (!conf.getBoolean(IntegrationTestingUtility.IS_DISTRIBUTED_CLUSTER, false)) return; FileSystem fs = FileSystem.get(conf); Path partitionsFile = new Path(TotalOrderPartitioner.getPartitionFile(conf)); assertFalse("Failed to clean up partitions file.", fs.exists(partitionsFile)); }
public static void configureSampling(Path outPath, BioJob job, VCFSortOptions options) throws IOException{ Configuration conf = job.getConfiguration(); final Path partition = outPath.getFileSystem(conf).makeQualified(new Path(outPath, "_partitioning" + "VCF")); TotalOrderPartitioner.setPartitionFile(conf, partition); try { final URI partitionURI = new URI(partition.toString() + "#" + partition.getName()); if(partitionURI.getScheme().equals("file")) return; ReferenceShare.distributeCache(partitionURI.toString(), job); } catch (URISyntaxException e) { throw new RuntimeException(e); } }
/** * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against * <code>splitPoints</code>. Cleans up the partitions file after job exists. */ static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints) throws IOException { Configuration conf = job.getConfiguration(); // create the partitions file FileSystem fs = FileSystem.get(conf); Path partitionsPath = new Path(conf.get("hadoop.tmp.dir"), "partitions_" + UUID.randomUUID()); fs.makeQualified(partitionsPath); writePartitions(conf, partitionsPath, splitPoints); fs.deleteOnExit(partitionsPath); // configure job to use it job.setPartitionerClass(TotalOrderPartitioner.class); TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); }
/** * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against * <code>splitPoints</code>. Cleans up the partitions file after job exists. */ static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints) throws IOException { // create the partitions file FileSystem fs = FileSystem.get(job.getConfiguration()); Path partitionsPath = new Path("/tmp", "partitions_" + UUID.randomUUID()); fs.makeQualified(partitionsPath); fs.deleteOnExit(partitionsPath); writePartitions(job.getConfiguration(), partitionsPath, splitPoints); // configure job to use it job.setPartitionerClass(TotalOrderPartitioner.class); TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionsPath); }
/** * Configure <code>job</code> with a TotalOrderPartitioner, partitioning * against <code>splitPoints</code>. Cleans up the partitions file after job * exists. */ static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints) throws IOException { // create the partitions file FileSystem fs = FileSystem.get(job.getConfiguration()); Path partitionsPath = new Path("/tmp", "partitions_" + UUID.randomUUID()); fs.makeQualified(partitionsPath); fs.deleteOnExit(partitionsPath); writePartitions(job.getConfiguration(), partitionsPath, splitPoints); // configure job to use it job.setPartitionerClass(TotalOrderPartitioner.class); TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionsPath); }
/** * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against * <code>splitPoints</code>. Cleans up the partitions file after job exists. */ static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints) throws IOException { Configuration conf = job.getConfiguration(); // create the partitions file FileSystem fs = FileSystem.get(conf); Path partitionsPath = new Path(conf.get("hbase.fs.tmp.dir"), "partitions_" + UUID.randomUUID()); fs.makeQualified(partitionsPath); writePartitions(conf, partitionsPath, splitPoints); fs.deleteOnExit(partitionsPath); // configure job to use it job.setPartitionerClass(TotalOrderPartitioner.class); TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); }
@Override public int run(String[] newargs) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(TeraSort.class); // set mapper and reducer class job.setMapperClass(TeraMapper.class); job.setReducerClass(TeraReducer.class); // set number of reducers job.setNumReduceTasks(32); job.setInputFormatClass(KeyValueTextInputFormat.class); // set output of map class text as key job.setMapOutputKeyClass(Text.class); // set output of reducer as text class as key and value both are Text job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // set input path for the job FileInputFormat.addInputPath(job, new Path(newargs[0])); Path partitionFile = new Path(new Path(newargs[2]), "partitioning"); TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionFile); // use random sampler to write partitioner file InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.01, 1000,32); InputSampler.writePartitionFile(job, sampler); // set partitioner to TotalOrderPartitioner job.setPartitionerClass(TotalOrderPartitioner.class); // set output directory for the job FileOutputFormat.setOutputPath(job, new Path(newargs[1])); int ret = job.waitForCompletion(true) ? 0 : 1; logger.info("Done"); return ret; }
protected boolean runDictionaryJob() throws ClassNotFoundException, IOException, InterruptedException, URISyntaxException { boolean jobOK; Job job = null; BufferedWriter bufferedWriter; // if output path exists... if (this.dictionaryFS.exists(this.conf.getDictionaryOutputPath())) { if (this.conf.getDeleteDictionaryOutputPath()) { // ... and option provided, delete recursively this.dictionaryFS.delete(this.conf.getDictionaryOutputPath(), true); } else { // ... and option not provided, fail System.out.println("Dictionary output path does exist: " + this.conf.getDictionaryOutputPath()); System.out.println("Select other path or use option -dd to overwrite"); System.exit(-1); } } // Sample the SequenceInputFormat to do TotalSort and create final output job = new Job(this.conf.getConfigurationObject(), this.conf.getDictionaryJobName() + " phase 2"); job.setJarByClass(HDTBuilderDriver.class); System.out.println("samples = " + this.conf.getDictionarySamplesPath()); System.out.println("output = " + this.conf.getDictionaryOutputPath()); FileInputFormat.addInputPath(job, this.conf.getDictionarySamplesPath()); FileOutputFormat.setOutputPath(job, this.conf.getDictionaryOutputPath()); job.setInputFormatClass(SequenceFileInputFormat.class); LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class); // Identity Mapper // job.setMapperClass(Mapper.class); job.setCombinerClass(DictionaryCombiner.class); job.setPartitionerClass(TotalOrderPartitioner.class); job.setReducerClass(DictionaryReducer.class); job.setNumReduceTasks(this.conf.getDictionaryReducers()); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); System.out.println("Sampling started"); InputSampler.writePartitionFile(job, new InputSampler.IntervalSampler<Text, Text>(this.conf.getSampleProbability())); String partitionFile = TotalOrderPartitioner.getPartitionFile(job.getConfiguration()); URI partitionUri = new URI(partitionFile + "#" + TotalOrderPartitioner.DEFAULT_PATH); DistributedCache.addCacheFile(partitionUri, job.getConfiguration()); DistributedCache.createSymlink(job.getConfiguration()); System.out.println("Sampling finished"); MultipleOutputs.addNamedOutput(job, HDTBuilderConfiguration.SHARED, SequenceFileOutputFormat.class, Text.class, NullWritable.class); MultipleOutputs.addNamedOutput(job, HDTBuilderConfiguration.SUBJECTS, SequenceFileOutputFormat.class, Text.class, NullWritable.class); MultipleOutputs.addNamedOutput(job, HDTBuilderConfiguration.PREDICATES, SequenceFileOutputFormat.class, Text.class, NullWritable.class); MultipleOutputs.addNamedOutput(job, HDTBuilderConfiguration.OBJECTS, SequenceFileOutputFormat.class, Text.class, NullWritable.class); SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressorClass(job, com.hadoop.compression.lzo.LzoCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK); jobOK = job.waitForCompletion(true); this.numShared = job.getCounters().findCounter(Counters.Shared).getValue(); this.numSubjects = job.getCounters().findCounter(Counters.Subjects).getValue(); this.numPredicates = job.getCounters().findCounter(Counters.Predicates).getValue(); this.numObjects = job.getCounters().findCounter(Counters.Objects).getValue(); bufferedWriter = new BufferedWriter(new OutputStreamWriter(this.dictionaryFS.create(this.conf.getDictionaryCountersFile()))); bufferedWriter.write(HDTBuilderConfiguration.SHARED + "=" + this.numShared + "\n"); bufferedWriter.write(HDTBuilderConfiguration.SUBJECTS + "=" + this.numSubjects + "\n"); bufferedWriter.write(HDTBuilderConfiguration.PREDICATES + "=" + this.numPredicates + "\n"); bufferedWriter.write(HDTBuilderConfiguration.OBJECTS + "=" + this.numObjects + "\n"); bufferedWriter.close(); return jobOK; }
protected boolean runTriplesJob() throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Job job = null; boolean jobOK; // if triples output path exists... if (this.triplesFS.exists(this.conf.getTriplesOutputPath())) { if (this.conf.getDeleteTriplesOutputPath()) { // ... and option provided, delete recursively this.triplesFS.delete(this.conf.getTriplesOutputPath(), true); } else { // ... and option not provided, fail System.out.println("Triples output path does exist: " + this.conf.getTriplesOutputPath()); System.out.println("Select other path or use option -dt to overwrite"); System.exit(-1); } } job = new Job(this.conf.getConfigurationObject(), this.conf.getTriplesJobName() + " phase 2"); job.setJarByClass(HDTBuilderDriver.class); FileInputFormat.addInputPath(job, this.conf.getTriplesSamplesPath()); FileOutputFormat.setOutputPath(job, this.conf.getTriplesOutputPath()); job.setInputFormatClass(SequenceFileInputFormat.class); LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class); job.setSortComparatorClass(TripleSPOComparator.class); job.setGroupingComparatorClass(TripleSPOComparator.class); job.setPartitionerClass(TotalOrderPartitioner.class); job.setOutputKeyClass(TripleSPOWritable.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(this.conf.getTriplesReducers()); System.out.println("Sampling started"); InputSampler.writePartitionFile(job, new InputSampler.IntervalSampler<Text, Text>(this.conf.getSampleProbability())); String partitionFile = TotalOrderPartitioner.getPartitionFile(job.getConfiguration()); URI partitionUri = new URI(partitionFile + "#" + TotalOrderPartitioner.DEFAULT_PATH); DistributedCache.addCacheFile(partitionUri, job.getConfiguration()); DistributedCache.createSymlink(job.getConfiguration()); System.out.println("Sampling finished"); SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressorClass(job, com.hadoop.compression.lzo.LzoCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK); jobOK = job.waitForCompletion(true); return jobOK; }
public int run(String[] args) throws Exception { SortConfig config = new SortConfig(); config.fromArray(args); Job job = Job.getInstance(getConf()); job.setJobName("sort"); job.setJarByClass(SortDriver.class); // define the path Path inputPath = new Path(config.getInput()); Path partitionFilePath = new Path(config.getPartition()); Path outputPath = new Path(config.getOutput()); Path metaPath = new Path(config.getMeta()); LOGGER.info("use " + inputPath.toString() + " as sort input"); LOGGER.info("use " + partitionFilePath.toString() + " as partition"); LOGGER.info("use " + outputPath.toString() + " as sort output"); LOGGER.info("use " + metaPath.toString() + " as meta output"); // define the mapper // use the identity mapper, which is the default implementation job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(SequenceFileInputFormat.class); SequenceFileInputFormat.setInputPaths(job, inputPath); // define the reducer job.getConfiguration().set(SortReducer.META_BASE_CONFIG_NAME, metaPath.toString()); job.setReducerClass(SortReducer.class); job.setNumReduceTasks(NUM_REDUCER); // use text for debug, use sequence is faster I guess job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); TextOutputFormat.setOutputPath(job, outputPath); // set partition job.setPartitionerClass(TotalOrderPartitioner.class); TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionFilePath); // set the sampler InputSampler.writePartitionFile(job, new InputSampler.RandomSampler( 1, 10000)); // set multiple output MultipleOutputs.addNamedOutput(job, "meta", TextOutputFormat.class, IntWritable.class, Text.class); // clean up the old output path outputPath.getFileSystem(job.getConfiguration()).delete(outputPath, true); metaPath.getFileSystem(job.getConfiguration()).delete(metaPath, true); // run the job and wait until it complete return job.waitForCompletion(true) ? 0 : 1; }
/** * The MapReduce driver - setup and launch the job. * * @param args the command-line arguments * @return the process exit code * @throws Exception if something goes wrong */ public int run(final String[] args) throws Exception { int numReducers = 2; Cli cli = Cli.builder().setArgs(args).addOptions(CliOpts.values()).build(); int result = cli.runCmd(); if (result != 0) { return result; } Path input = new Path(cli.getArgValueAsString(CliOpts.INPUT)); Path partitionFile = new Path(cli.getArgValueAsString(CliOpts.PARTITION)); Path output = new Path(cli.getArgValueAsString(CliOpts.OUTPUT)); InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<Text, Text> (0.1, 10000, 10); Configuration conf = super.getConf(); Job job = new Job(conf); job.setJarByClass(TotalSortMapReduce.class); job.setNumReduceTasks(numReducers); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setPartitionerClass(TotalOrderPartitioner.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionFile); FileInputFormat.setInputPaths(job, input); FileOutputFormat.setOutputPath(job, output); InputSampler.writePartitionFile(job, sampler); URI partitionUri = new URI(partitionFile.toString() + "#" + "_sortPartitioning"); DistributedCache.addCacheFile(partitionUri, conf); if (job.waitForCompletion(true)) { return 0; } return 1; }
/** * Sorts a hadoop sequence file * * @param inputFileName The name of the file to sort. * @param outputFileName The name of the sorted file. * @throws Exception */ public void run(String inputFileName, String outputFileName) throws Exception { int numThreads = conf.getInt("ldbc.snb.datagen.generator.numThreads", 1); if (keySetterName != null) { conf.set("keySetterClassName", keySetterName); } /** First Job to sort the key-value pairs and to count the number of elements processed by each reducer.**/ Job jobSort = Job.getInstance(conf, "Sorting " + inputFileName); FileInputFormat.setInputPaths(jobSort, new Path(inputFileName)); FileOutputFormat .setOutputPath(jobSort, new Path(conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/rankIntermediate")); if (keySetterName != null) { jobSort.setMapperClass(HadoopFileRankerSortMapper.class); } jobSort.setMapOutputKeyClass(K); jobSort.setMapOutputValueClass(V); jobSort.setOutputKeyClass(BlockKey.class); jobSort.setOutputValueClass(V); jobSort.setNumReduceTasks(numThreads); jobSort.setReducerClass(HadoopFileRankerSortReducer.class); jobSort.setJarByClass(V); jobSort.setInputFormatClass(SequenceFileInputFormat.class); jobSort.setOutputFormatClass(SequenceFileOutputFormat.class); InputSampler.Sampler sampler = new InputSampler.RandomSampler(0.1, 1000); TotalOrderPartitioner.setPartitionFile(jobSort.getConfiguration(), new Path(inputFileName + "_partition.lst")); InputSampler.writePartitionFile(jobSort, sampler); jobSort.setPartitionerClass(TotalOrderPartitioner.class); if (!jobSort.waitForCompletion(true)) { throw new Exception(); } /** Second Job to assign the rank to each element.**/ Job jobRank = Job.getInstance(conf, "Sorting " + inputFileName); FileInputFormat .setInputPaths(jobRank, new Path(conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/rankIntermediate")); FileOutputFormat.setOutputPath(jobRank, new Path(outputFileName)); jobRank.setMapOutputKeyClass(BlockKey.class); jobRank.setMapOutputValueClass(V); jobRank.setOutputKeyClass(LongWritable.class); jobRank.setOutputValueClass(V); jobRank.setSortComparatorClass(BlockKeyComparator.class); jobRank.setNumReduceTasks(numThreads); jobRank.setReducerClass(HadoopFileRankerFinalReducer.class); jobRank.setJarByClass(V); jobRank.setInputFormatClass(SequenceFileInputFormat.class); jobRank.setOutputFormatClass(SequenceFileOutputFormat.class); jobRank.setPartitionerClass(HadoopFileRankerPartitioner.class); if (!jobRank.waitForCompletion(true)) { throw new Exception(); } try { FileSystem fs = FileSystem.get(conf); for (int i = 0; i < numThreads; ++i) { fs.delete(new Path(conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/rank_" + i), true); } fs.delete(new Path(conf.get("ldbc.snb.datagen.serializer.hadoopDir") + "/rankIntermediate"), true); } catch (IOException e) { System.err.println(e.getMessage()); } }
/** * Fixed a potential overlap of generated regions / splits for a dataset with lots of identical keys. For instance, * let your samples be: {1,1,1 ,1,3,3, 3,5,6} and your number of partitions be 3. Original implementation will get you * following splits, 1-1, 3-3, 3-6, notice the overlap between 2nd and 3rd partition. * * @param job * @param sampler * @param <K> * @param <V> * @throws IOException * @throws ClassNotFoundException * @throws InterruptedException */ @SuppressWarnings("unchecked") public static <K, V> void writePartitionFile(Job job, InputSampler.Sampler<K, V> sampler) throws IOException, ClassNotFoundException, InterruptedException { LinkedList<K> splits = new LinkedList<K>(); Configuration conf = job.getConfiguration(); final InputFormat inf = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); int numPartitions = job.getNumReduceTasks(); K[] samples = null; //sampler.getSample(inf, job); LOG.info("Using " + samples.length + " samples"); RawComparator<K> comparator = (RawComparator<K>) job.getGroupingComparator(); Arrays.sort(samples, comparator); Path dst = new Path(TotalOrderPartitioner.getPartitionFile(conf)); FileSystem fs = dst.getFileSystem(conf); if (fs.exists(dst)) fs.delete(dst, false); SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, dst, job.getMapOutputKeyClass(), NullWritable.class); NullWritable nullValue = NullWritable.get(); float stepSize = samples.length / (float) numPartitions; K lastKey = null; K currentKey = null; int lastKeyIndex = -1; for (int i = 1; i < numPartitions; ++i) { int currentKeyOffset = Math.round(stepSize * i); if (lastKeyIndex > currentKeyOffset) { long keyOffset = lastKeyIndex - currentKeyOffset; float errorRate = keyOffset / samples.length; LOG.warn( String.format("Partitions overlap. Consider using a different Sampler " + "and/or increase the number of samples and/or use more splits to take samples from. " + "Next sample would have been %s key overlaps by a distance of %d (factor %f) ", samples[currentKeyOffset], keyOffset, errorRate)); currentKeyOffset = lastKeyIndex + 1; } currentKey = samples[currentKeyOffset]; while (lastKey != null && comparator.compare(currentKey, lastKey) == 0) { currentKeyOffset++; if (currentKeyOffset >= samples.length) { LOG.info("Last 10 elements:"); for (int d = samples.length - 1; d > samples.length - 11; d--) { LOG.debug(samples[d]); } throw new IOException("Not enough samples, stopped at partition " + i); } currentKey = samples[currentKeyOffset]; } writer.append(currentKey, nullValue); lastKey = currentKey; lastKeyIndex = currentKeyOffset; splits.add(currentKey); } writer.close(); LOG.info("********************************************* "); LOG.info(" START KEYs for new Regions: "); for (K split : splits) { LOG.info("* " + split.toString()); } }
public static Job createSubmittableJob(Configuration conf, String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Path inputDir = new Path(args[0]); Path outputDir = new Path(args[1]); boolean createPartitionFile = Boolean.parseBoolean(args[2]); Job job = Job.getInstance(conf, "Import delicious RSS feed into Hush tables."); job.setJarByClass(BulkImportJobExample.class); job.setInputFormatClass(TextInputFormat.class); // conf.setLong("hbase.hregion.max.filesize", 64 * 1024); FileInputFormat.setInputPaths(job, inputDir); job.setMapperClass(BulkImportMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); job.setPartitionerClass(TotalOrderPartitioner.class); job.setReducerClass(PutSortReducer.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setOutputFormatClass(HFileOutputFormat.class); HFileOutputFormat.setOutputPath(job, outputDir); HFileOutputFormat.setCompressOutput(job, true); HFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); job.getConfiguration().set("hfile.compression", "gz"); //job.getConfiguration().setFloat("mapred.job.shuffle.input.buffer.percent", 0.5f); //job.setNumReduceTasks(30); Path partitionsPath = new Path(job.getWorkingDirectory(), "partitions_" + System.currentTimeMillis()); TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), partitionsPath); if (createPartitionFile) { VerboseInputSampler.Sampler<KeyValue, ImmutableBytesWritable> sampler = new VerboseInputSampler.VerboseRandomSampler<KeyValue, ImmutableBytesWritable>(0.05, 1000000, 30); // use 0.1 for real sampling LOG.info("Sampling key space"); VerboseInputSampler.writePartitionFile(job, sampler); LOG.info("Samping done"); } URI cacheUri = new URI(partitionsPath.toString() + "#" + TotalOrderPartitioner.DEFAULT_PATH); DistributedCache.addCacheFile(cacheUri, job.getConfiguration()); DistributedCache.createSymlink(job.getConfiguration()); return job; }
/** Sorts a hadoop sequence file * * @param inputFileName The name of the file to sort. * @param outputFileName The name of the sorted file. * @throws Exception */ public void run( String inputFileName, String outputFileName ) throws Exception { int numThreads = conf.getInt("numThreads",1); /** First Job to sort the key-value pairs and to count the number of elements processed by each reducer.**/ Job jobSort = new Job(conf, "Sorting "+inputFileName); FileInputFormat.setInputPaths(jobSort, new Path(inputFileName)); FileOutputFormat.setOutputPath(jobSort, new Path(conf.get("outputDir")+"/hadoop/rankIntermediate")); jobSort.setMapOutputKeyClass(K); jobSort.setMapOutputValueClass(V); jobSort.setOutputKeyClass(ComposedKey.class); jobSort.setOutputValueClass(V); jobSort.setNumReduceTasks(numThreads); jobSort.setReducerClass(HadoopFileRankerSortReducer.class); jobSort.setJarByClass(V); jobSort.setInputFormatClass(SequenceFileInputFormat.class); jobSort.setOutputFormatClass(SequenceFileOutputFormat.class); InputSampler.Sampler sampler = new InputSampler.RandomSampler(0.1, 1000); TotalOrderPartitioner.setPartitionFile(jobSort.getConfiguration(), new Path(inputFileName + "_partition.lst")); InputSampler.writePartitionFile(jobSort, sampler); jobSort.setPartitionerClass(TotalOrderPartitioner.class); jobSort.waitForCompletion(true); /** Second Job to assign the rank to each element.**/ Job jobRank = new Job(conf, "Sorting "+inputFileName); FileInputFormat.setInputPaths(jobRank, new Path(conf.get("outputDir")+"/hadoop/rankIntermediate")); FileOutputFormat.setOutputPath(jobRank, new Path(outputFileName)); jobRank.setMapOutputKeyClass(ComposedKey.class); jobRank.setMapOutputValueClass(V); jobRank.setOutputKeyClass(LongWritable.class); jobRank.setOutputValueClass(V); jobRank.setSortComparatorClass(ComposedKeyComparator.class); jobRank.setNumReduceTasks(numThreads); jobRank.setReducerClass(HadoopFileRankerFinalReducer.class); jobRank.setJarByClass(V); jobRank.setInputFormatClass(SequenceFileInputFormat.class); jobRank.setOutputFormatClass(SequenceFileOutputFormat.class); jobRank.setPartitionerClass(HadoopFileRankerPartitioner.class); jobRank.waitForCompletion(true); try{ FileSystem fs = FileSystem.get(conf); for(int i = 0; i < numThreads;++i ) { fs.delete(new Path(conf.get("outputDir")+"/hadoop/rank_"+i),true); } fs.delete(new Path(conf.get("outputDir")+"/hadoop/rankIntermediate"),true); } catch(IOException e) { System.err.println(e.getMessage()); } }
@SuppressWarnings("unchecked") @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Path inputPath = new Path(args[0]); Path partitionFile = new Path(args[1] + "_partitions.lst"); Path outputStage = new Path(args[1] + "_staging"); Path outputOrder = new Path(args[1]); // Configure job to prepare for sampling Job sampleJob = new Job(conf, "TotalOrderSortingStage"); sampleJob.setJarByClass(TotalOrderSortingStage.class); // Use the mapper implementation with zero reduce tasks sampleJob.setMapperClass(LastAccessMapper.class); sampleJob.setNumReduceTasks(0); sampleJob.setOutputKeyClass(Text.class); sampleJob.setOutputValueClass(Text.class); TextInputFormat.setInputPaths(sampleJob, inputPath); // Set the output format to a sequence file sampleJob.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setOutputPath(sampleJob, outputStage); // Submit the job and get completion code. int code = sampleJob.waitForCompletion(true) ? 0 : 1; if (code == 0) { Job orderJob = new Job(conf, "TotalOrderSortingStage"); orderJob.setJarByClass(TotalOrderSortingStage.class); // Here, use the identity mapper to output the key/value pairs in // the SequenceFile orderJob.setMapperClass(Mapper.class); orderJob.setReducerClass(ValuesReducer.class); // Set the number of reduce tasks to an appropriate number for the // amount of data being sorted orderJob.setNumReduceTasks(10); // Use Hadoop's TotalOrderPartitioner class orderJob.setPartitionerClass(TotalOrderPartitioner.class); // Set the partition file TotalOrderPartitioner.setPartitionFile(orderJob.getConfiguration(), partitionFile); orderJob.setOutputKeyClass(Text.class); orderJob.setOutputValueClass(Text.class); // Set the input to the previous job's output orderJob.setInputFormatClass(SequenceFileInputFormat.class); SequenceFileInputFormat.setInputPaths(orderJob, outputStage); // Set the output path to the command line parameter TextOutputFormat.setOutputPath(orderJob, outputOrder); // Set the separator to an empty string orderJob.getConfiguration().set( "mapred.textoutputformat.separator", ""); // Use the InputSampler to go through the output of the previous // job, sample it, and create the partition file InputSampler.writePartitionFile(orderJob, new InputSampler.RandomSampler(.001, 10000)); // Submit the job code = orderJob.waitForCompletion(true) ? 0 : 2; } // Clean up the partition file and the staging directory FileSystem.get(new Configuration()).delete(partitionFile, false); FileSystem.get(new Configuration()).delete(outputStage, true); return code; }
/** * Configure a MapReduce Job to perform an incremental load into the given * table. This * <ul> * <li>Inspects the table to configure a total order partitioner</li> * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> * <li>Sets the number of reduce tasks to match the current number of regions</li> * <li>Sets the output key/value class to match HFileOutputFormat's requirements</li> * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or * PutSortReducer)</li> * </ul> * The user should be sure to set the map output value class to either KeyValue or Put before * running this function. */ public static void configureIncrementalLoad(Job job, HTable table) throws IOException { Configuration conf = job.getConfiguration(); job.setPartitionerClass(TotalOrderPartitioner.class); job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(KeyValue.class); job.setOutputFormatClass(HFileOutputFormat.class); // Based on the configured map output class, set the correct reducer to properly // sort the incoming values. // TODO it would be nice to pick one or the other of these formats. if (KeyValue.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(KeyValueSortReducer.class); } else if (Put.class.equals(job.getMapOutputValueClass())) { job.setReducerClass(PutSortReducer.class); } else { LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); } LOG.info("Looking up current regions for table " + table); List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table); LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count"); job.setNumReduceTasks(startKeys.size()); Path partitionsPath = new Path(job.getWorkingDirectory(), "partitions_" + UUID.randomUUID()); LOG.info("Writing partition information to " + partitionsPath); FileSystem fs = partitionsPath.getFileSystem(conf); writePartitions(conf, partitionsPath, startKeys); partitionsPath.makeQualified(fs); URI cacheUri; try { cacheUri = new URI(partitionsPath.toString() + "#" + TotalOrderPartitioner.DEFAULT_PATH); } catch (URISyntaxException e) { throw new IOException(e); } DistributedCache.addCacheFile(cacheUri, conf); DistributedCache.createSymlink(conf); // Set compression algorithms based on column families configureCompression(table, conf); configureBloomType(table, conf); TableMapReduceUtil.addDependencyJars(job); LOG.info("Incremental table output configured."); }