public static void run(Configuration conf, Path inputPath, Path output, double params) throws IOException, ClassNotFoundException, InterruptedException { String jobName = "calculating parameter"; conf.set("params",String.valueOf(params)); Job job = new Job(conf, jobName); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(indexToCountWritable.class); job.setOutputKeyClass(twoDimensionIndexWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setMapperClass(CalParamsMapper.class); job.setReducerClass(CalParamsReducer.class); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job,output); job.setJarByClass(LDADriver.class); if (!job.waitForCompletion(true)) { throw new InterruptedException("calculating parameter failed"); } }
public static void Run(String input, Configuration conf) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(conf); // job.setJobName(Hdfs2es.class.getName()); job.setJarByClass(Hdfs2es.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setMapperClass(MapTask.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(EsOutputFormat.class); job.setNumReduceTasks(0); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(input)); job.setSpeculativeExecution(false); job.waitForCompletion(true); }
private static void joinAs(String jointype, Class<? extends SimpleCheckerMapBase<?>> map, Class<? extends SimpleCheckerReduceBase> reduce) throws Exception { final int srcs = 4; Configuration conf = new Configuration(); Path base = cluster.getFileSystem().makeQualified(new Path("/"+jointype)); Path[] src = writeSimpleSrc(base, conf, srcs); conf.set(CompositeInputFormat.JOIN_EXPR, CompositeInputFormat.compose(jointype, SequenceFileInputFormat.class, src)); conf.setInt("testdatamerge.sources", srcs); Job job = Job.getInstance(conf); job.setInputFormatClass(CompositeInputFormat.class); FileOutputFormat.setOutputPath(job, new Path(base, "out")); job.setMapperClass(map); job.setReducerClass(reduce); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); job.waitForCompletion(true); assertTrue("Job failed", job.isSuccessful()); if ("outer".equals(jointype)) { checkOuterConsistency(job, src); } base.getFileSystem(conf).delete(base, true); }
/** * Job configuration. */ public static Job configureJob(Configuration conf, String [] args) throws IOException { Path inputPath = new Path(args[0]); String tableName = args[1]; Job job = new Job(conf, NAME + "_" + tableName); job.setJarByClass(Uploader.class); FileInputFormat.setInputPaths(job, inputPath); job.setInputFormatClass(SequenceFileInputFormat.class); job.setMapperClass(Uploader.class); // No reducers. Just write straight to table. Call initTableReducerJob // because it sets up the TableOutputFormat. TableMapReduceUtil.initTableReducerJob(tableName, null, job); job.setNumReduceTasks(0); return job; }
protected static Job getJob(String jobname, Configuration inputConf, String inputpath, String outputpath) throws Exception { final Configuration conf = new Configuration(inputConf); conf.set("fileoutputpath", outputpath); final FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(outputpath))) { fs.delete(new Path(outputpath), true); } fs.close(); final Job job = Job.getInstance(conf, jobname); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setInputFormatClass(SequenceFileInputFormat.class); FileInputFormat.addInputPath(job, new Path(inputpath)); FileOutputFormat.setOutputPath(job, new Path(outputpath)); return job; }
private Job getJob(Configuration conf, String jobName, String inputpath, String outputpath) throws IOException { final FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(outputpath))) { fs.delete(new Path(outputpath), true); } fs.close(); final Job job = Job.getInstance(conf, jobName); job.setJarByClass(NonSortTestMR.class); job.setMapperClass(NonSortTestMR.Map.class); job.setReducerClass(NonSortTestMR.KeyHashSumReduce.class); job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputValueClass(LongWritable.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(inputpath)); FileOutputFormat.setOutputPath(job, new Path(outputpath)); return job; }
public static Job getCompressJob(String jobname, Configuration conf, String inputpath, String outputpath) throws Exception { Job job = Job.getInstance(conf, jobname + "-CompressMapperJob"); job.setJarByClass(CompressMapper.class); job.setMapperClass(TextCompressMapper.class); job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // if output file exists ,delete it final FileSystem hdfs = FileSystem.get(new ScenarioConfiguration()); if (hdfs.exists(new Path(outputpath))) { hdfs.delete(new Path(outputpath), true); } hdfs.close(); job.setInputFormatClass(SequenceFileInputFormat.class); FileInputFormat.addInputPath(job, new Path(inputpath)); FileOutputFormat.setOutputPath(job, new Path(outputpath)); return job; }
public KVJob(String jobname, Configuration conf, Class<?> keyclass, Class<?> valueclass, String inputpath, String outputpath) throws Exception { job = Job.getInstance(conf, jobname); job.setJarByClass(KVJob.class); job.setMapperClass(KVJob.ValueMapper.class); job.setOutputKeyClass(keyclass); job.setMapOutputValueClass(valueclass); if (conf.get(TestConstants.NATIVETASK_KVTEST_CREATEFILE).equals("true")) { final FileSystem fs = FileSystem.get(conf); fs.delete(new Path(inputpath), true); fs.close(); final TestInputFile testfile = new TestInputFile(Integer.valueOf(conf.get( TestConstants.FILESIZE_KEY, "1000")), keyclass.getName(), valueclass.getName(), conf); StopWatch sw = new StopWatch().start(); testfile.createSequenceTestFile(inputpath); LOG.info("Created test file " + inputpath + " in " + sw.now(TimeUnit.MILLISECONDS) + "ms"); } job.setInputFormatClass(SequenceFileInputFormat.class); FileInputFormat.addInputPath(job, new Path(inputpath)); FileOutputFormat.setOutputPath(job, new Path(outputpath)); }
private boolean genBigItemMap(String input, String output) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(this.getConf(), "Computing items remapping for " + this.input); job.setJarByClass(TopPIoverHadoop.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); job.setMapperClass(InverseMapper.class); job.setReducerClass(ItemBigRebasingReducer.class); job.setNumReduceTasks(1); return job.waitForCompletion(true); }
public void run(Configuration conf, Path matrixInputPath, String meanSpanFileName, Path matrixOutputPath) throws IOException, InterruptedException, ClassNotFoundException { conf.set(MEANSPANOPTION, meanSpanFileName); Job job = new Job(conf); job.setJobName("Norm2Job"); job.setJarByClass(Norm2Job.class); FileSystem fs = FileSystem.get(matrixInputPath.toUri(), conf); matrixInputPath = fs.makeQualified(matrixInputPath); matrixOutputPath = fs.makeQualified(matrixOutputPath); FileInputFormat.addInputPath(job, matrixInputPath); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); FileOutputFormat.setOutputPath(job, matrixOutputPath); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setNumReduceTasks(1); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(DoubleWritable.class); job.submit(); job.waitForCompletion(true); }
public static void runIteration(Configuration conf, Path corpusInput, Path modelInput, Path modelOutput, int iterationNumber, int maxIterations, int numReduceTasks) throws IOException, ClassNotFoundException, InterruptedException { String jobName = String.format("Iteration %d of %d, input path: %s", iterationNumber, maxIterations, modelInput); log.info("About to run: " + jobName); Job job = new Job(conf, jobName); job.setJarByClass(CVB0Driver.class); job.setMapperClass(CachingCVB0Mapper.class); job.setCombinerClass(VectorSumReducer.class); job.setReducerClass(VectorSumReducer.class); job.setNumReduceTasks(numReduceTasks); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(VectorWritable.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); FileInputFormat.addInputPath(job, corpusInput); FileOutputFormat.setOutputPath(job, modelOutput); setModelPaths(job, modelInput); HadoopUtil.delete(conf, modelOutput); if (!job.waitForCompletion(true)) { throw new InterruptedException(String.format("Failed to complete iteration %d stage 1", iterationNumber)); } }
public static void runJob(Path input, Path output) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = new Job(conf, "UnitVectorizerJob"); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(VectorWritable.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setMapperClass(UnitVectorizerMapper.class); job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, input); FileOutputFormat.setOutputPath(job, output); job.setJarByClass(UnitVectorizerJob.class); boolean succeeded = job.waitForCompletion(true); if (!succeeded) { throw new IllegalStateException("Job failed!"); } }
private static double calculatePerplexity(Configuration conf, Path corpusPath, Path modelPath, int iteration) throws IOException, ClassNotFoundException, InterruptedException { String jobName = "Calculating perplexity for " + modelPath; log.info("About to run: " + jobName); Job job = new Job(conf, jobName); job.setJarByClass(CachingCVB0PerplexityMapper.class); job.setMapperClass(CachingCVB0PerplexityMapper.class); job.setCombinerClass(DualDoubleSumReducer.class); job.setReducerClass(DualDoubleSumReducer.class); job.setNumReduceTasks(1); job.setOutputKeyClass(DoubleWritable.class); job.setOutputValueClass(DoubleWritable.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); FileInputFormat.addInputPath(job, corpusPath); Path outputPath = perplexityPath(modelPath.getParent(), iteration); FileOutputFormat.setOutputPath(job, outputPath); setModelPaths(job, modelPath); HadoopUtil.delete(conf, outputPath); if (!job.waitForCompletion(true)) { throw new InterruptedException("Failed to calculate perplexity for: " + modelPath); } return readPerplexity(conf, modelPath.getParent(), iteration); }
/** * Process as a map reduce job. The numberOfReduceTasks is set to the number of clusters present in the * output. So that each cluster's vector is written in its own part file. * * @param conf * The hadoop configuration. * @param input * The output path provided to the clustering algorithm, whose would be post processed. Hint : The * path of the directory containing clusters-*-final and clusteredPoints. * @param output * The post processed data would be stored at this path. */ private static void postProcessMR(Configuration conf, Path input, Path output) throws IOException, InterruptedException, ClassNotFoundException { Job job = new Job(conf, "ClusterOutputPostProcessor Driver running over input: " + input); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setMapperClass(ClusterOutputPostProcessorMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(VectorWritable.class); job.setReducerClass(ClusterOutputPostProcessorReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(VectorWritable.class); int numberOfClusters = ClusterCountReader.getNumberOfClusters(input, conf); job.setNumReduceTasks(numberOfClusters); job.setJarByClass(ClusterOutputPostProcessorDriver.class); FileInputFormat.addInputPath(job, new Path(input, new Path("clusteredPoints"))); FileOutputFormat.setOutputPath(job, output); if (!job.waitForCompletion(true)) { throw new InterruptedException("ClusterOutputPostProcessor Job failed processing " + input); } }
private static Job writeTopicModel(Configuration conf, Path modelInput, Path output) throws IOException, InterruptedException, ClassNotFoundException { String jobName = String.format("Writing final topic/term distributions from %s to %s", modelInput, output); log.info("About to run: " + jobName); Job job = new Job(conf, jobName); job.setJarByClass(CVB0Driver.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setMapperClass(CVB0TopicTermVectorNormalizerMapper.class); job.setNumReduceTasks(0); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(VectorWritable.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); FileInputFormat.addInputPath(job, modelInput); FileOutputFormat.setOutputPath(job, output); job.submit(); return job; }
private static Job writeDocTopicInference(Configuration conf, Path corpus, Path modelInput, Path output) throws IOException, ClassNotFoundException, InterruptedException { String jobName = String.format("Writing final document/topic inference from %s to %s", corpus, output); log.info("About to run: " + jobName); Job job = new Job(conf, jobName); job.setMapperClass(CVB0DocInferenceMapper.class); job.setNumReduceTasks(0); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(VectorWritable.class); FileSystem fs = FileSystem.get(corpus.toUri(), conf); if (modelInput != null && fs.exists(modelInput)) { FileStatus[] statuses = fs.listStatus(modelInput, PathFilters.partFilter()); URI[] modelUris = new URI[statuses.length]; for (int i = 0; i < statuses.length; i++) { modelUris[i] = statuses[i].getPath().toUri(); } DistributedCache.setCacheFiles(modelUris, conf); } FileInputFormat.addInputPath(job, corpus); FileOutputFormat.setOutputPath(job, output); job.setJarByClass(CVB0Driver.class); job.submit(); return job; }
@Override public void createVectors(Path input, Path output, VectorizerConfig config) throws IOException, ClassNotFoundException, InterruptedException { //do this for convenience of using prepareJob Job job = HadoopUtil.prepareJob(input, output, SequenceFileInputFormat.class, EncodingMapper.class, Text.class, VectorWritable.class, SequenceFileOutputFormat.class, config.getConf()); Configuration conf = job.getConfiguration(); conf.set(EncodingMapper.USE_SEQUENTIAL, String.valueOf(config.isSequentialAccess())); conf.set(EncodingMapper.USE_NAMED_VECTORS, String.valueOf(config.isNamedVectors())); conf.set(EncodingMapper.ANALYZER_NAME, config.getAnalyzerClassName()); conf.set(EncodingMapper.ENCODER_FIELD_NAME, config.getEncoderName()); conf.set(EncodingMapper.ENCODER_CLASS, config.getEncoderClass()); conf.set(EncodingMapper.CARDINALITY, String.valueOf(config.getCardinality())); job.setNumReduceTasks(0); boolean finished = job.waitForCompletion(true); log.info("result of run: {}", finished); if (!finished) { throw new IllegalStateException("Job failed!"); } }
@Override public int run(String[] args) throws Exception { Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1; } job.setNumReduceTasks(20); job.setJobName("WebTables Wordcount Job"); job.setInputFormatClass(SequenceFileInputFormat.class); job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setJarByClass(WordcountJob.class); return job.waitForCompletion(true) ? 0 : 1; }
/** * Convert vectors to MeanShiftCanopies using Hadoop */ private static void createCanopyFromVectorsMR(Configuration conf, Path input, Path output, DistanceMeasure measure) throws IOException, InterruptedException, ClassNotFoundException { conf.set(KMeansConfigKeys.DISTANCE_MEASURE_KEY, measure.getClass() .getName()); Job job = new Job(conf); job.setJarByClass(MeanShiftCanopyDriver.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(ClusterWritable.class); job.setMapperClass(MeanShiftCanopyCreatorMapper.class); job.setNumReduceTasks(0); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); FileInputFormat.setInputPaths(job, input); FileOutputFormat.setOutputPath(job, output); if (!job.waitForCompletion(true)) { throw new InterruptedException( "Mean Shift createCanopyFromVectorsMR failed on input " + input); } }
public void run(String inputFileName, String outputFileName) throws Exception { int numThreads = conf.getInt("ldbc.snb.datagen.generator.numThreads", 1); System.out.println("***************" + numThreads); conf.set("keySetterClassName", keySetterName); /** First Job to sort the key-value pairs and to count the number of elements processed by each reducer.**/ Job job = Job.getInstance(conf, "Sorting " + inputFileName); FileInputFormat.setInputPaths(job, new Path(inputFileName)); FileOutputFormat.setOutputPath(job, new Path(outputFileName)); job.setMapOutputKeyClass(K); job.setMapOutputValueClass(V); job.setOutputKeyClass(TupleKey.class); job.setOutputValueClass(V); job.setNumReduceTasks(numThreads); job.setReducerClass(HadoopFileKeyChangerReducer.class); job.setJarByClass(V); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); if (!job.waitForCompletion(true)) { throw new Exception(); } }
public static void total(String name, String in, String out) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); conf.set(QUERIED_NAME, name); Job job = Job.getInstance(new Cluster(conf), conf); job.setJarByClass(Total.class); // in if (!in.endsWith("/")) in = in.concat("/"); in = in.concat("employees"); SequenceFileInputFormat.addInputPath(job, new Path(in)); job.setInputFormatClass(SequenceFileInputFormat.class); // map job.setMapperClass(TotalMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); // reduce job.setCombinerClass(TotalReducer.class); job.setReducerClass(TotalReducer.class); // out SequenceFileOutputFormat.setOutputPath(job, new Path(out)); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); job.waitForCompletion(true); }
public static Job createJob(String name, String base) throws IOException { Configuration conf = new Configuration(); conf.set(Total.QUERIED_NAME, name); Job job = Job.getInstance(new Cluster(conf), conf); job.setJarByClass(Cut.class); // in String in = base; if (!base.endsWith("/")) in = in.concat("/"); in = in.concat("employees"); SequenceFileInputFormat.addInputPath(job, new Path(in)); job.setInputFormatClass(SequenceFileInputFormat.class); // map job.setMapperClass(CutMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Employee.class); // out SequenceFileOutputFormat.setOutputPath(job, new Path(base + "/tmp")); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Employee.class); return job; }
@SuppressWarnings("unchecked") public RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); InputFormat<K, V> indirIF = (InputFormat)ReflectionUtils.newInstance( conf.getClass(INDIRECT_INPUT_FORMAT, SequenceFileInputFormat.class), conf); IndirectSplit is = ((IndirectSplit)split); return indirIF.createRecordReader(new FileSplit(is.getPath(), 0, is.getLength(), (String[])null), context); }
public static void run(Configuration conf, Path[] inputPath, Path outputPath) throws IOException, ClassNotFoundException, InterruptedException { String jobName = "init matrix"; Job job = new Job(conf, jobName); job.setMapOutputKeyClass(twoDimensionIndexWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(twoDimensionIndexWritable.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); job.setMapperClass(InitMapper.class); job.setReducerClass(InitReducer.class); job.setNumReduceTasks(1); for(Path path : inputPath) { FileInputFormat.addInputPath(job, path); } Path output = new Path(outputPath, "initDir"); FileOutputFormat.setOutputPath(job, output); job.setJarByClass(LDADriver.class); if (!job.waitForCompletion(true)) { throw new InterruptedException("Init failed"); } }
/** * Test SampleUploader from examples */ @SuppressWarnings("unchecked") @Test public void testSampleUploader() throws Exception { Configuration configuration = new Configuration(); Uploader uploader = new Uploader(); Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context ctx = mock(Context.class); doAnswer(new Answer<Void>() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { ImmutableBytesWritable writer = (ImmutableBytesWritable) invocation.getArguments()[0]; Put put = (Put) invocation.getArguments()[1]; assertEquals("row", Bytes.toString(writer.get())); assertEquals("row", Bytes.toString(put.getRow())); return null; } }).when(ctx).write(any(ImmutableBytesWritable.class), any(Put.class)); uploader.map(null, new Text("row,family,qualifier,value"), ctx); Path dir = util.getDataTestDirOnTestFS("testSampleUploader"); String[] args = { dir.toString(), "simpleTable" }; Job job = SampleUploader.configureJob(configuration, args); assertEquals(SequenceFileInputFormat.class, job.getInputFormatClass()); }
private void readHostDb(Path hostDb, Path output, boolean dumpHomepages, boolean dumpHostnames, String expr) throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = System.currentTimeMillis(); LOG.info("ReadHostDb: starting at " + sdf.format(start)); Configuration conf = getConf(); conf.setBoolean(HOSTDB_DUMP_HOMEPAGES, dumpHomepages); conf.setBoolean(HOSTDB_DUMP_HOSTNAMES, dumpHostnames); if (expr != null) { conf.set(HOSTDB_FILTER_EXPRESSION, expr); } conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); conf.set("mapred.textoutputformat.separator", "\t"); Job job = new Job(conf, "ReadHostDb"); job.setJarByClass(ReadHostDb.class); FileInputFormat.addInputPath(job, new Path(hostDb, "current")); FileOutputFormat.setOutputPath(job, output); job.setJarByClass(ReadHostDb.class); job.setMapperClass(ReadHostDbMapper.class); job.setInputFormatClass(SequenceFileInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); try { job.waitForCompletion(true); } catch (Exception e) { throw e; } long end = System.currentTimeMillis(); LOG.info("ReadHostDb: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end)); }
/** * Restore a {@code JavaPairRDD<Long,List<Writable>>} previously saved with {@link #saveMapFile(String, JavaRDD)}}<br> * Note that if the keys are not required, simply use {@code restoreMapFile(...).values()} * * @param path Path of the MapFile * @param sc Spark context * @return The restored RDD, with their unique indices as the key */ public static JavaPairRDD<Long, List<Writable>> restoreMapFile(String path, JavaSparkContext sc) { Configuration c = new Configuration(); c.set(FileInputFormat.INPUT_DIR, FilenameUtils.normalize(path, true)); JavaPairRDD<LongWritable, RecordWritable> pairRDD = sc.newAPIHadoopRDD(c, SequenceFileInputFormat.class, LongWritable.class, RecordWritable.class); return pairRDD.mapToPair(new RecordLoadPairFunction()); }