private void createUserVisitsTableDirectly() throws IOException, URISyntaxException { log.info("Creating user visits..."); Path rankings = new Path(options.getResultPath(), RANKINGS); Path fout = new Path(options.getResultPath(), USERVISITS); JobConf job = new JobConf(HiveData.class); String jobname = "Create uservisits"; job.setJobName(jobname); setVisitsOptions(job); /*** * Set distributed cache file for table generation, * cache files include: * 1. user agents * 2. country code and language code * 3. search keys */ Path uagentPath = new Path(options.getWorkPath(), uagentf); DistributedCache.addCacheFile(uagentPath.toUri(), job); Path countryPath = new Path(options.getWorkPath(), countryf); DistributedCache.addCacheFile(countryPath.toUri(), job); Path searchkeyPath = new Path(options.getWorkPath(), searchkeyf); DistributedCache.addCacheFile(searchkeyPath.toUri(), job); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(JoinBytesInt.class); MultipleInputs.addInputPath(job, dummy.getPath(), NLineInputFormat.class, DummyToAccessNoMapper.class); if (options.isSequenceOut()) { MultipleInputs.addInputPath(job, rankings, SequenceFileInputFormat.class, SequenceRankingsToUrlsMapper.class); } else { MultipleInputs.addInputPath(job, rankings, TextInputFormat.class, TextRankingsToUrlsMapper.class); } job.setCombinerClass(JoinBytesIntCombiner.class); job.setReducerClass(CreateUserVisitsReducer.class); if (options.getNumReds() > 0) { job.setNumReduceTasks(options.getNumReds()); } else { job.setNumReduceTasks(Utils.getMaxNumReds()); } // job.setNumReduceTasks(options.slots/2); if (options.isSequenceOut()) { job.setOutputFormat(SequenceFileOutputFormat.class); } else { job.setOutputFormat(TextOutputFormat.class); } if (null != options.getCodecClass()) { job.set("mapred.output.compression.type","BLOCK"); job.set("mapreduce.output.fileoutputformat.compress.type","BLOCK"); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, options.getCodecClass()); } FileOutputFormat.setOutputPath(job, fout); log.info("Running Job: " +jobname); log.info("Dummy file " + dummy.getPath() + " as input"); log.info("Rankings file " + rankings + " as input"); log.info("Ouput file " + fout); JobClient.runJob(job); log.info("Finished Running Job: " + jobname); }
private void appendFromArchive(Path harSrc, List<Path> relativePaths, Path harDst) throws IOException { Path outputPath = harDst; FileOutputFormat.setOutputPath(conf, outputPath); FileSystem outFs = outputPath.getFileSystem(conf); if (!outFs.exists(outputPath)) { throw new IOException("Invalid Output. HAR File " + outputPath + "doesn't exist"); } if (outFs.isFile(outputPath)) { throw new IOException("Invalid Output. HAR File " + outputPath + "must be represented as directory"); } long totalSize = writeFromArchiveFilesToProcess(harSrc, relativePaths); //make it a har path FileSystem fs1 = harSrc.getFileSystem(conf); URI uri = fs1.getUri(); Path parentPath = new Path("har://" + "hdfs-" + uri.getHost() +":" + uri.getPort() + fs1.makeQualified(harSrc).toUri().getPath()); FileSystem fs = parentPath.getFileSystem(conf); conf.set(SRC_LIST_LABEL, srcFiles.toString()); conf.set(SRC_PARENT_LABEL, parentPath.makeQualified(fs).toString()); conf.setLong(TOTAL_SIZE_LABEL, totalSize); long partSize = conf.getLong(HAR_PARTSIZE_LABEL, HAR_PARTSIZE_DEFAULT); int numMaps = (int) (totalSize / partSize); //run atleast one map. conf.setNumMapTasks(numMaps == 0 ? 1 : numMaps); conf.setNumReduceTasks(1); conf.setOutputFormat(NullOutputFormat.class); conf.setMapOutputKeyClass(IntWritable.class); conf.setMapOutputValueClass(Text.class); conf.set("hadoop.job.history.user.location", "none"); //make sure no speculative execution is done conf.setSpeculativeExecution(false); // set starting offset for mapper int partId = findFirstAvailablePartId(outputPath); conf.setInt(PART_ID_OFFSET, partId); Path index = new Path(outputPath, HarFileSystem.INDEX_NAME); Path indexDirectory = new Path(outputPath, HarFileSystem.INDEX_NAME + ".copy"); outFs.mkdirs(indexDirectory); Path indexCopy = new Path(indexDirectory, "data"); outFs.rename(index, indexCopy); MultipleInputs.addInputPath(conf, jobDirectory, HArchiveInputFormat.class, HArchivesMapper.class); MultipleInputs.addInputPath(conf, indexDirectory, TextInputFormat.class, HArchivesConvertingMapper.class); conf.setReducerClass(HArchivesMergingReducer.class); JobClient.runJob(conf); cleanJobDirectory(); }
public static void main(String[] args) throws Exception { if (!validArgs(args)) { printUsage(); return; } // These are the temp paths that are created on HDFS String dir1 = "/user/miyuru/csrconverter-output"; String dir2 = "/user/miyuru/csrconverter-output-sorted"; // We first delete the temporary directories if they exist on the HDFS FileSystem fs1 = FileSystem.get(new JobConf()); System.out.println("Deleting the dir : " + dir1); if (fs1.exists(new Path(dir1))) { fs1.delete(new Path(dir1), true); } System.out.println("Done deleting the dir : " + dir1); System.out.println("Deleting the dir : " + dir2); if (fs1.exists(new Path(dir2))) { fs1.delete(new Path(dir2), true); } Path notinPath = new Path("/user/miyuru/notinverts/notinverts"); if (!fs1.exists(notinPath)) { fs1.create(notinPath); } System.out.println("Done deleting the dir : " + dir2); // Note on Aug 23 2014: Sometimes after this the mapReduce job hangs. // need to see why. VertexCounterClient.setDefaultGraphID(args[3], args[2]); // First job creates the inverted index JobConf conf = new JobConf(CSRConverter.class); conf.set("org.acacia.partitioner.hbase.zookeeper.quorum", args[1]); conf.set("org.acacia.partitioner.hbase.table", args[2]); conf.set("org.acacia.partitioner.hbase.contacthost", args[3]); conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(Text.class); // conf.setMapperClass(InvertedMapper.class); conf.setReducerClass(InvertedReducer.class); // conf.setInputFormat(TextInputFormat.class); conf.setInputFormat(NLinesInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); // FileInputFormat.setInputPaths(conf, new Path(args[0])); MultipleInputs.addInputPath(conf, new Path(args[0]), NLinesInputFormat.class, InvertedMapper.class); MultipleInputs.addInputPath(conf, new Path( "/user/miyuru/notinverts/notinverts"), TextInputFormat.class, InvertedMapper.class); FileOutputFormat.setOutputPath(conf, new Path(dir1)); // Also for the moment we turn-off the speculative execution conf.setBoolean("mapred.map.tasks.speculative.execution", false); conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); conf.setNumMapTasks(96); conf.setNumReduceTasks(96); conf.setPartitionerClass(VertexPartitioner.class); conf.set("vertex-count", args[4]); conf.set("zero-flag", args[5]); Job job = new Job(conf, "csr_inverter"); job.setSortComparatorClass(SortComparator.class); job.waitForCompletion(true); }