private List<Writable> getMapRecords(Path dir, Text key) throws Exception { MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, dir, getConf()); ArrayList<Writable> res = new ArrayList<Writable>(); Class<?> keyClass = readers[0].getKeyClass(); Class<?> valueClass = readers[0].getValueClass(); if (!keyClass.getName().equals("org.apache.hadoop.io.Text")) throw new IOException("Incompatible key (" + keyClass.getName() + ")"); Writable value = (Writable) valueClass.newInstance(); // we don't know the partitioning schema for (int i = 0; i < readers.length; i++) { if (readers[i].get(key, value) != null) { res.add(value); value = (Writable) valueClass.newInstance(); Text aKey = (Text) keyClass.newInstance(); while (readers[i].next(aKey, value) && aKey.equals(key)) { res.add(value); value = (Writable) valueClass.newInstance(); } } readers[i].close(); } return res; }
public static JobConf createMergeJob(Configuration config, Path linkDb, boolean normalize, boolean filter) { Path newLinkDb = new Path("linkdb-merge-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); JobConf job = new NutchJob(config); job.setJobName("linkdb merge " + linkDb); job.setInputFormat(SequenceFileInputFormat.class); job.setMapperClass(LinkDbFilter.class); job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize); job.setBoolean(LinkDbFilter.URL_FILTERING, filter); job.setReducerClass(LinkDbMerger.class); FileOutputFormat.setOutputPath(job, newLinkDb); job.setOutputFormat(MapFileOutputFormat.class); job.setBoolean("mapred.output.compress", true); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Inlinks.class); // https://issues.apache.org/jira/browse/NUTCH-1069 job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); return job; }
/** * Prints the content of the Node represented by the url to system out. * * @param webGraphDb * The webgraph from which to get the node. * @param url * The url of the node. * * @throws IOException * If an error occurs while getting the node. */ public void dumpUrl(Path webGraphDb, String url) throws IOException { fs = FileSystem.get(getConf()); nodeReaders = MapFileOutputFormat.getReaders(fs, new Path(webGraphDb, WebGraph.NODE_DIR), getConf()); // open the readers, get the node, print out the info, and close the readers Text key = new Text(url); Node node = new Node(); MapFileOutputFormat.getEntry(nodeReaders, new HashPartitioner<Text, Node>(), key, node); System.out.println(url + ":"); System.out.println(" inlink score: " + node.getInlinkScore()); System.out.println(" outlink score: " + node.getOutlinkScore()); System.out.println(" num inlinks: " + node.getNumInlinks()); System.out.println(" num outlinks: " + node.getNumOutlinks()); FSUtils.closeReaders(nodeReaders); }
private List<Writable> getMapRecords(Path dir, Text key) throws Exception { MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, dir, getConf()); ArrayList<Writable> res = new ArrayList<Writable>(); Class keyClass = readers[0].getKeyClass(); Class valueClass = readers[0].getValueClass(); if (!keyClass.getName().equals("org.apache.hadoop.io.Text")) throw new IOException("Incompatible key (" + keyClass.getName() + ")"); Writable value = (Writable)valueClass.newInstance(); // we don't know the partitioning schema for (int i = 0; i < readers.length; i++) { if (readers[i].get(key, value) != null) { res.add(value); value = (Writable)valueClass.newInstance(); Text aKey = (Text) keyClass.newInstance(); while (readers[i].next(aKey, value) && aKey.equals(key)) { res.add(value); value = (Writable)valueClass.newInstance(); } } readers[i].close(); } return res; }
/** * Prints loopset for a single url. The loopset information will show any * outlink url the eventually forms a link cycle. * * @param webGraphDb The WebGraph to check for loops * @param url The url to check. * * @throws IOException If an error occurs while printing loopset information. */ public void dumpUrl(Path webGraphDb, String url) throws IOException { // open the readers fs = FileSystem.get(getConf()); loopReaders = MapFileOutputFormat.getReaders(fs, new Path(webGraphDb, Loops.LOOPS_DIR), getConf()); // get the loopset for a given url, if any Text key = new Text(url); LoopSet loop = new LoopSet(); MapFileOutputFormat.getEntry(loopReaders, new HashPartitioner<Text, LoopSet>(), key, loop); // print out each loop url in the set System.out.println(url + ":"); for (String loopUrl : loop.getLoopSet()) { System.out.println(" " + loopUrl); } // close the readers FSUtils.closeReaders(loopReaders); }
/** * Prints the content of the Node represented by the url to system out. * * @param webGraphDb The webgraph from which to get the node. * @param url The url of the node. * * @throws IOException If an error occurs while getting the node. */ public void dumpUrl(Path webGraphDb, String url) throws IOException { fs = FileSystem.get(getConf()); nodeReaders = MapFileOutputFormat.getReaders(fs, new Path(webGraphDb, WebGraph.NODE_DIR), getConf()); // open the readers, get the node, print out the info, and close the readers Text key = new Text(url); Node node = new Node(); MapFileOutputFormat.getEntry(nodeReaders, new HashPartitioner<Text, Node>(), key, node); System.out.println(url + ":"); System.out.println(" inlink score: " + node.getInlinkScore()); System.out.println(" outlink score: " + node.getOutlinkScore()); System.out.println(" num inlinks: " + node.getNumInlinks()); System.out.println(" num outlinks: " + node.getNumOutlinks()); FSUtils.closeReaders(nodeReaders); }
private void openReaders(String crawlDb, JobConf config) throws IOException { if (readers != null) return; FileSystem fs = FileSystem.get(config); readers = MapFileOutputFormat.getReaders(fs, new Path(crawlDb, CrawlDb.CURRENT_NAME), config); }
public CrawlDatum get(String crawlDb, String url, JobConf config) throws IOException { Text key = new Text(url); CrawlDatum val = new CrawlDatum(); openReaders(crawlDb, config); CrawlDatum res = (CrawlDatum) MapFileOutputFormat.getEntry(readers, new HashPartitioner<Text, CrawlDatum>(), key, val); return res; }
/** * Runs the initializer job. The initializer job sets up the nodes with a * default starting score for link analysis. * * @param nodeDb * The node database to use. * @param output * The job output directory. * * @throws IOException * If an error occurs while running the initializer job. */ private void runInitializer(Path nodeDb, Path output) throws IOException { // configure the initializer JobConf initializer = new NutchJob(getConf()); initializer.setJobName("LinkAnalysis Initializer"); FileInputFormat.addInputPath(initializer, nodeDb); FileOutputFormat.setOutputPath(initializer, output); initializer.setInputFormat(SequenceFileInputFormat.class); initializer.setMapperClass(Initializer.class); initializer.setMapOutputKeyClass(Text.class); initializer.setMapOutputValueClass(Node.class); initializer.setOutputKeyClass(Text.class); initializer.setOutputValueClass(Node.class); initializer.setOutputFormat(MapFileOutputFormat.class); initializer.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); // run the initializer LOG.info("Starting initialization job"); try { JobClient.runJob(initializer); } catch (IOException e) { LOG.error(StringUtils.stringifyException(e)); throw e; } LOG.info("Finished initialization job."); }
/** * Runs the link analysis job. The link analysis job applies the link rank * formula to create a score per url and stores that score in the NodeDb. * * Typically the link analysis job is run a number of times to allow the link * rank scores to converge. * * @param nodeDb * The node database from which we are getting previous link rank * scores. * @param inverted * The inverted inlinks * @param output * The link analysis output. * @param iteration * The current iteration number. * @param numIterations * The total number of link analysis iterations * * @throws IOException * If an error occurs during link analysis. */ private void runAnalysis(Path nodeDb, Path inverted, Path output, int iteration, int numIterations, float rankOne) throws IOException { JobConf analyzer = new NutchJob(getConf()); analyzer.set("link.analyze.iteration", String.valueOf(iteration + 1)); analyzer.setJobName("LinkAnalysis Analyzer, iteration " + (iteration + 1) + " of " + numIterations); FileInputFormat.addInputPath(analyzer, nodeDb); FileInputFormat.addInputPath(analyzer, inverted); FileOutputFormat.setOutputPath(analyzer, output); analyzer.set("link.analyze.rank.one", String.valueOf(rankOne)); analyzer.setMapOutputKeyClass(Text.class); analyzer.setMapOutputValueClass(ObjectWritable.class); analyzer.setInputFormat(SequenceFileInputFormat.class); analyzer.setMapperClass(Analyzer.class); analyzer.setReducerClass(Analyzer.class); analyzer.setOutputKeyClass(Text.class); analyzer.setOutputValueClass(Node.class); analyzer.setOutputFormat(MapFileOutputFormat.class); analyzer.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); LOG.info("Starting analysis job"); try { JobClient.runJob(analyzer); } catch (IOException e) { LOG.error(StringUtils.stringifyException(e)); throw e; } LOG.info("Finished analysis job."); }
public static void main(String[] args) throws Exception { if (args == null || args.length < 2) { System.out.println("LinkDumper$Reader usage: <webgraphdb> <url>"); return; } // open the readers for the linkdump directory Configuration conf = NutchConfiguration.create(); FileSystem fs = FileSystem.get(conf); Path webGraphDb = new Path(args[0]); String url = args[1]; MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new Path( webGraphDb, DUMP_DIR), conf); // get the link nodes for the url Text key = new Text(url); LinkNodes nodes = new LinkNodes(); MapFileOutputFormat.getEntry(readers, new HashPartitioner<Text, LinkNodes>(), key, nodes); // print out the link nodes LinkNode[] linkNodesAr = nodes.getLinks(); System.out.println(url + ":"); for (LinkNode node : linkNodesAr) { System.out.println(" " + node.getUrl() + " - " + node.getNode().toString()); } // close the readers FSUtils.closeReaders(readers); }
/** * Checks the merged segment and removes the stuff again. * * @param the * test directory * @param the * merged segment * @return the final status */ protected byte checkMergedSegment(Path testDir, Path mergedSegment) throws Exception { // Get a MapFile reader for the <Text,CrawlDatum> pairs MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new Path( mergedSegment, CrawlDatum.FETCH_DIR_NAME), conf); Text key = new Text(); CrawlDatum value = new CrawlDatum(); byte finalStatus = 0x0; for (MapFile.Reader reader : readers) { while (reader.next(key, value)) { LOG.info("Reading status for: " + key.toString() + " > " + CrawlDatum.getStatusName(value.getStatus())); // Only consider fetch status if (CrawlDatum.hasFetchStatus(value) && key.toString().equals("http://nutch.apache.org/")) { finalStatus = value.getStatus(); } } // Close the reader again reader.close(); } // Remove the test directory again fs.delete(testDir, true); LOG.info("Final fetch status for: http://nutch.apache.org/ > " + CrawlDatum.getStatusName(finalStatus)); // Return the final status return finalStatus; }
@Test public void testLargeMerge() throws Exception { SegmentMerger merger = new SegmentMerger(conf); merger.merge(out, new Path[] { seg1, seg2 }, false, false, -1); // verify output FileStatus[] stats = fs.listStatus(out); // there should be just one path Assert.assertEquals(1, stats.length); Path outSeg = stats[0].getPath(); Text k = new Text(); ParseText v = new ParseText(); MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new Path( outSeg, ParseText.DIR_NAME), conf); int cnt1 = 0, cnt2 = 0; for (MapFile.Reader r : readers) { while (r.next(k, v)) { String ks = k.toString(); String vs = v.getText(); if (ks.startsWith("seg1-")) { cnt1++; Assert.assertTrue(vs.startsWith("seg1 ")); } else if (ks.startsWith("seg2-")) { cnt2++; Assert.assertTrue(vs.startsWith("seg2 ")); } } r.close(); } Assert.assertEquals(countSeg1, cnt1); Assert.assertEquals(countSeg2, cnt2); }
/** * Extracts CF for each found anchor. * * @param inputPath * @param mapPath * @param outputPath * @throws IOException */ private void task3(String inputPath, String mapPath, String outputPath) throws IOException { LOG.info("Extracting anchor text (phase 3)..."); LOG.info(" - input: " + inputPath); LOG.info(" - output: " + outputPath); LOG.info(" - mapping: " + mapPath); JobConf conf = new JobConf(getConf(), ExtractWikipediaAnchorText.class); conf.setJobName(String.format("ExtractWikipediaAnchorText:phase3[input: %s, output: %s]", inputPath, outputPath)); conf.setNumReduceTasks(1); String location = "map.dat"; try { DistributedCache.addCacheFile(new URI(mapPath + "/part-00000/data" + "#" + location), conf); //DistributedCache.addCacheFile(new URI(mapPath + "/singleentitymap.data" + "#" + location), conf); DistributedCache.createSymlink(conf); } catch (URISyntaxException e) { e.printStackTrace(); } FileInputFormat.addInputPath(conf, new Path(inputPath)); FileOutputFormat.setOutputPath(conf, new Path(outputPath)); conf.setInputFormat(SequenceFileInputFormat.class); conf.setOutputFormat(MapFileOutputFormat.class); // conf.setOutputFormat(TextOutputFormat.class); conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(IntWritable.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MyMapper3.class); conf.setCombinerClass(MyReducer3.class); conf.setReducerClass(MyReducer3.class); JobClient.runJob(conf); }
/** * Maps from (targetID, (anchor, count)) to (anchor, (targetID, count)). * * @param inputPath * @param outputPath * @throws IOException */ private void task4(String inputPath, String outputPath) throws IOException { LOG.info("Extracting anchor text (phase 4)..."); LOG.info(" - input: " + inputPath); LOG.info(" - output: " + outputPath); JobConf conf = new JobConf(getConf(), ExtractWikipediaAnchorText.class); conf.setJobName(String.format("ExtractWikipediaAnchorText:phase4[input: %s, output: %s]", inputPath, outputPath)); conf.setNumReduceTasks(1); //FileInputFormat.addInputPath(conf, new Path(inputPath + "/part-00000/data")); FileInputFormat.addInputPath(conf, new Path(inputPath + "/part-*/data")); FileOutputFormat.setOutputPath(conf, new Path(outputPath)); conf.setInputFormat(SequenceFileInputFormat.class); conf.setOutputFormat(MapFileOutputFormat.class); conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(HMapSIW.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(HMapSIW.class); conf.setMapperClass(MyMapper4.class); conf.setReducerClass(MyReducer4.class); JobClient.runJob(conf); }
public CrawlDatum get(String crawlDb, String url, Configuration config) throws IOException { Text key = new Text(url); CrawlDatum val = new CrawlDatum(); openReaders(crawlDb, config); CrawlDatum res = (CrawlDatum)MapFileOutputFormat.getEntry(readers, new HashPartitioner<Text, CrawlDatum>(), key, val); return res; }
public void processDumpJob(String crawlDb, String output, Configuration config, String format, String regex, String status) throws IOException { if (LOG.isInfoEnabled()) { LOG.info("CrawlDb dump: starting"); LOG.info("CrawlDb db: " + crawlDb); } Path outFolder = new Path(output); JobConf job = new NutchJob(config); job.setJobName("dump " + crawlDb); FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME)); job.setInputFormat(SequenceFileInputFormat.class); FileOutputFormat.setOutputPath(job, outFolder); if (format.equals("csv")) { job.setOutputFormat(CrawlDatumCsvOutputFormat.class); } else if (format.equals("crawldb")) { job.setOutputFormat(MapFileOutputFormat.class); } else { job.setOutputFormat(TextOutputFormat.class); } if (status != null) job.set("status", status); if (regex != null) job.set("regex", regex); job.setMapperClass(CrawlDbDumpMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CrawlDatum.class); JobClient.runJob(job); if (LOG.isInfoEnabled()) { LOG.info("CrawlDb dump: done"); } }
/** * Runs the initializer job. The initializer job sets up the nodes with a * default starting score for link analysis. * * @param nodeDb The node database to use. * @param output The job output directory. * * @throws IOException If an error occurs while running the initializer job. */ private void runInitializer(Path nodeDb, Path output) throws IOException { // configure the initializer JobConf initializer = new NutchJob(getConf()); initializer.setJobName("LinkAnalysis Initializer"); FileInputFormat.addInputPath(initializer, nodeDb); FileOutputFormat.setOutputPath(initializer, output); initializer.setInputFormat(SequenceFileInputFormat.class); initializer.setMapperClass(Initializer.class); initializer.setMapOutputKeyClass(Text.class); initializer.setMapOutputValueClass(Node.class); initializer.setOutputKeyClass(Text.class); initializer.setOutputValueClass(Node.class); initializer.setOutputFormat(MapFileOutputFormat.class); initializer.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); // run the initializer LOG.info("Starting initialization job"); try { JobClient.runJob(initializer); } catch (IOException e) { LOG.error(StringUtils.stringifyException(e)); throw e; } LOG.info("Finished initialization job."); }
/** * Runs the link analysis job. The link analysis job applies the link rank * formula to create a score per url and stores that score in the NodeDb. * * Typically the link analysis job is run a number of times to allow the link * rank scores to converge. * * @param nodeDb The node database from which we are getting previous link * rank scores. * @param inverted The inverted inlinks * @param output The link analysis output. * @param iteration The current iteration number. * @param numIterations The total number of link analysis iterations * * @throws IOException If an error occurs during link analysis. */ private void runAnalysis(Path nodeDb, Path inverted, Path output, int iteration, int numIterations, float rankOne) throws IOException { JobConf analyzer = new NutchJob(getConf()); analyzer.set("link.analyze.iteration", String.valueOf(iteration + 1)); analyzer.setJobName("LinkAnalysis Analyzer, iteration " + (iteration + 1) + " of " + numIterations); FileInputFormat.addInputPath(analyzer, nodeDb); FileInputFormat.addInputPath(analyzer, inverted); FileOutputFormat.setOutputPath(analyzer, output); analyzer.set("link.analyze.rank.one", String.valueOf(rankOne)); analyzer.setMapOutputKeyClass(Text.class); analyzer.setMapOutputValueClass(ObjectWritable.class); analyzer.setInputFormat(SequenceFileInputFormat.class); analyzer.setMapperClass(Analyzer.class); analyzer.setReducerClass(Analyzer.class); analyzer.setOutputKeyClass(Text.class); analyzer.setOutputValueClass(Node.class); analyzer.setOutputFormat(MapFileOutputFormat.class); analyzer.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); LOG.info("Starting analysis job"); try { JobClient.runJob(analyzer); } catch (IOException e) { LOG.error(StringUtils.stringifyException(e)); throw e; } LOG.info("Finished analysis job."); }
public void testLargeMerge() throws Exception { SegmentMerger merger = new SegmentMerger(conf); merger.merge(out, new Path[]{seg1, seg2}, false, false, -1); // verify output FileStatus[] stats = fs.listStatus(out); // there should be just one path assertEquals(1, stats.length); Path outSeg = stats[0].getPath(); Text k = new Text(); ParseText v = new ParseText(); MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, new Path(outSeg, ParseText.DIR_NAME), conf); int cnt1 = 0, cnt2 = 0; for (MapFile.Reader r : readers) { while (r.next(k, v)) { String ks = k.toString(); String vs = v.getText(); if (ks.startsWith("seg1-")) { cnt1++; assertTrue(vs.startsWith("seg1 ")); } else if (ks.startsWith("seg2-")) { cnt2++; assertTrue(vs.startsWith("seg2 ")); } } r.close(); } assertEquals(countSeg1, cnt1); assertEquals(countSeg2, cnt2); }
private void createRankingsTableDirectly() throws IOException, URISyntaxException { log.info("Creating table rankings..."); Path fout = new Path(options.getResultPath(), RANKINGS); JobConf job = new JobConf(HiveData.class); String jobname = "Create rankings"; /** TODO: change another more effective way as this operation may cause * about 2 min delay (originally ~15min in total) */ setRankingsOptions(job); job.setJobName(jobname); job.set("mapred.reduce.slowstart.completed.maps", "0.3"); job.set("mapreduce.job.reduce.slowstart.completedmaps", "0.3"); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(JoinBytesInt.class); job.setJarByClass(DummyToRankingsMapper.class); job.setJarByClass(JoinBytesIntCombiner.class); job.setJarByClass(GenerateRankingsReducer.class); job.setMapperClass(DummyToRankingsMapper.class); job.setCombinerClass(JoinBytesIntCombiner.class); job.setReducerClass(GenerateRankingsReducer.class); if (options.getNumReds() > 0) { job.setNumReduceTasks(options.getNumReds()); } else { job.setNumReduceTasks(Utils.getMaxNumReds()); } job.setInputFormat(NLineInputFormat.class); FileInputFormat.setInputPaths(job, dummy.getPath()); job.set("mapred.map.output.compression.type", "BLOCK"); job.set("mapreduce.output.fileoutputformat.compress.type","BLOCK"); MapFileOutputFormat.setCompressOutput(job, true); // MapFileOutputFormat.setOutputCompressorClass(job, org.apache.hadoop.io.compress.LzoCodec.class); MapFileOutputFormat.setOutputCompressorClass(job, org.apache.hadoop.io.compress.DefaultCodec.class); if (options.isSequenceOut()) { job.setOutputFormat(SequenceFileOutputFormat.class); } else { job.setOutputFormat(TextOutputFormat.class); } if (null != options.getCodecClass()) { job.set("mapred.output.compression.type","BLOCK"); job.set("mapreduce.output.fileoutputformat.compress.type","BLOCK"); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, options.getCodecClass()); } FileOutputFormat.setOutputPath(job, fout); log.info("Running Job: " +jobname); log.info("Pages file " + dummy.getPath() + " as input"); log.info("Rankings file " + fout + " as output"); JobClient.runJob(job); log.info("Finished Running Job: " + jobname); }
private void createNutchUrls() throws IOException, URISyntaxException { log.info("Creating nutch urls ..."); JobConf job = new JobConf(NutchData.class); Path urls = new Path(options.getWorkPath(), URLS_DIR_NAME); Utils.checkHdfsPath(urls); String jobname = "Create nutch urls"; job.setJobName(jobname); setNutchOptions(job); FileInputFormat.setInputPaths(job, dummy.getPath()); job.setInputFormat(NLineInputFormat.class); job.setMapperClass(CreateUrlHash.class); job.setNumReduceTasks(0); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputFormat(MapFileOutputFormat.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); MapFileOutputFormat.setOutputPath(job, urls); // SequenceFileOutputFormat.setOutputPath(job, fout); /* SequenceFileOutputFormat.setCompressOutput(job, true); SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); */ log.info("Running Job: " +jobname); log.info("Pages file " + dummy.getPath() + " as input"); log.info("Rankings file " + urls + " as output"); JobClient.runJob(job); log.info("Finished Running Job: " + jobname); log.info("Cleaning temp files..."); Utils.cleanTempFiles(urls); }
public void processDumpJob(String crawlDb, String output, JobConf config, String format, String regex, String status, Integer retry, String expr) throws IOException { if (LOG.isInfoEnabled()) { LOG.info("CrawlDb dump: starting"); LOG.info("CrawlDb db: " + crawlDb); } Path outFolder = new Path(output); JobConf job = new NutchJob(config); job.setJobName("dump " + crawlDb); FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME)); job.setInputFormat(SequenceFileInputFormat.class); FileOutputFormat.setOutputPath(job, outFolder); if (format.equals("csv")) { job.setOutputFormat(CrawlDatumCsvOutputFormat.class); } else if (format.equals("crawldb")) { job.setOutputFormat(MapFileOutputFormat.class); } else { job.setOutputFormat(TextOutputFormat.class); } if (status != null) job.set("status", status); if (regex != null) job.set("regex", regex); if (retry != null) job.setInt("retry", retry); if (expr != null) { job.set("expr", expr); LOG.info("CrawlDb db: expr: " + expr); } job.setMapperClass(CrawlDbDumpMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CrawlDatum.class); JobClient.runJob(job); if (LOG.isInfoEnabled()) { LOG.info("CrawlDb dump: done"); } }
/** * Updates the inlink score in the web graph node databsae into the crawl * database. * * @param crawlDb * The crawl database to update * @param webGraphDb * The webgraph database to use. * * @throws IOException * If an error occurs while updating the scores. */ public void update(Path crawlDb, Path webGraphDb) throws IOException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = System.currentTimeMillis(); LOG.info("ScoreUpdater: starting at " + sdf.format(start)); Configuration conf = getConf(); FileSystem fs = FileSystem.get(conf); // create a temporary crawldb with the new scores LOG.info("Running crawldb update " + crawlDb); Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR); Path crawlDbCurrent = new Path(crawlDb, CrawlDb.CURRENT_NAME); Path newCrawlDb = new Path(crawlDb, Integer.toString(new Random() .nextInt(Integer.MAX_VALUE))); // run the updater job outputting to the temp crawl database JobConf updater = new NutchJob(conf); updater.setJobName("Update CrawlDb from WebGraph"); FileInputFormat.addInputPath(updater, crawlDbCurrent); FileInputFormat.addInputPath(updater, nodeDb); FileOutputFormat.setOutputPath(updater, newCrawlDb); updater.setInputFormat(SequenceFileInputFormat.class); updater.setMapperClass(ScoreUpdater.class); updater.setReducerClass(ScoreUpdater.class); updater.setMapOutputKeyClass(Text.class); updater.setMapOutputValueClass(ObjectWritable.class); updater.setOutputKeyClass(Text.class); updater.setOutputValueClass(CrawlDatum.class); updater.setOutputFormat(MapFileOutputFormat.class); try { JobClient.runJob(updater); } catch (IOException e) { LOG.error(StringUtils.stringifyException(e)); // remove the temp crawldb on error if (fs.exists(newCrawlDb)) { fs.delete(newCrawlDb, true); } throw e; } // install the temp crawl database LOG.info("ScoreUpdater: installing new crawldb " + crawlDb); CrawlDb.install(updater, crawlDb); long end = System.currentTimeMillis(); LOG.info("ScoreUpdater: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end)); }
private Writable getEntry(MapFile.Reader[] readers, Text url, Writable entry) throws IOException { return MapFileOutputFormat.getEntry(readers, PARTITIONER, url, entry); }
/** * * Maps from (srcID, (targetID, anchor) to (targetID, (anchor, count)). * * @param inputPath * @param outputPath * @throws IOException */ private void task2(String inputPath, String outputPath, String redirPath) throws IOException { LOG.info("Extracting anchor text (phase 2)..."); LOG.info(" - input: " + inputPath); LOG.info(" - output: " + outputPath); Random r = new Random( ); //String tmpOutput = "tmp-" + this.getClass().getCanonicalName() + "-" + r.nextInt(10000); //LOG.info( "intermediate folder for merge " + tmpOutput ); JobConf conf = new JobConf(getConf(), ExtractWikipediaAnchorText.class); conf.setJobName(String.format("ExtractWikipediaAnchorText:phase2[input: %s, output: %s]", inputPath, outputPath)); // Gathers everything together for convenience; feasible for Wikipedia. conf.setNumReduceTasks(1); try { DistributedCache.addCacheFile(new URI(redirPath + "/part-00000" + "#" + "redirs.dat"), conf); DistributedCache.createSymlink(conf); } catch (URISyntaxException e) { e.printStackTrace(); } FileInputFormat.addInputPath(conf, new Path(inputPath)); FileOutputFormat.setOutputPath(conf, new Path(outputPath)); //FileOutputFormat.setOutputPath(conf, new Path(tmpOutput)); conf.setInputFormat(SequenceFileInputFormat.class); conf.setOutputFormat(MapFileOutputFormat.class); // conf.setOutputFormat(TextOutputFormat.class); conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(Text.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(HMapSIW.class); conf.setMapperClass(MyMapper2.class); conf.setReducerClass(MyReducer2.class); // Delete the output directory if it exists already. FileSystem.get(conf).delete(new Path(outputPath), true); JobClient.runJob(conf); // Clean up intermediate data. FileSystem.get(conf).delete(new Path(inputPath), true); /* //merge String finalO = outputPath+"/part-00000/data"; FileSystem.get(conf).mkdirs( new Path( outputPath + "part-00000") ); getMergeInHdfs( tmpOutput, finalO, conf ); FileSystem.get(conf).delete(new Path(tmpOutput), true); */ }
private void openReaders(String crawlDb, Configuration config) throws IOException { if (readers != null) return; FileSystem fs = FileSystem.get(config); readers = MapFileOutputFormat.getReaders(fs, new Path(crawlDb, CrawlDb.CURRENT_NAME), config); }
private void scan(Path crawlDb, Path outputPath, String regex, String status, boolean text) throws IOException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = System.currentTimeMillis(); LOG.info("CrawlDB scanner: starting at " + sdf.format(start)); JobConf job = new NutchJob(getConf()); job.setJobName("Scan : " + crawlDb + " for URLS matching : " + regex); job.set("CrawlDBScanner.regex", regex); if (status != null) job.set("CrawlDBScanner.status", status); FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME)); job.setInputFormat(SequenceFileInputFormat.class); job.setMapperClass(CrawlDBScanner.class); job.setReducerClass(CrawlDBScanner.class); FileOutputFormat.setOutputPath(job, outputPath); // if we want a text dump of the entries // in order to check something - better to use the text format and avoid // compression if (text) { job.set("mapred.output.compress", "false"); job.setOutputFormat(TextOutputFormat.class); } // otherwise what we will actually create is a mini-crawlDB which can be // then used // for debugging else { job.setOutputFormat(MapFileOutputFormat.class); } job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CrawlDatum.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(CrawlDatum.class); try { JobClient.runJob(job); } catch (IOException e) { throw e; } long end = System.currentTimeMillis(); LOG.info("CrawlDb scanner: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end)); }
/** * Updates the inlink score in the web graph node databsae into the crawl * database. * * @param crawlDb The crawl database to update * @param webGraphDb The webgraph database to use. * * @throws IOException If an error occurs while updating the scores. */ public void update(Path crawlDb, Path webGraphDb) throws IOException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = System.currentTimeMillis(); LOG.info("ScoreUpdater: starting at " + sdf.format(start)); Configuration conf = getConf(); FileSystem fs = FileSystem.get(conf); // create a temporary crawldb with the new scores LOG.info("Running crawldb update " + crawlDb); Path nodeDb = new Path(webGraphDb, WebGraph.NODE_DIR); Path crawlDbCurrent = new Path(crawlDb, CrawlDb.CURRENT_NAME); Path newCrawlDb = new Path(crawlDb, Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); // run the updater job outputting to the temp crawl database JobConf updater = new NutchJob(conf); updater.setJobName("Update CrawlDb from WebGraph"); FileInputFormat.addInputPath(updater, crawlDbCurrent); FileInputFormat.addInputPath(updater, nodeDb); FileOutputFormat.setOutputPath(updater, newCrawlDb); updater.setInputFormat(SequenceFileInputFormat.class); updater.setMapperClass(ScoreUpdater.class); updater.setReducerClass(ScoreUpdater.class); updater.setMapOutputKeyClass(Text.class); updater.setMapOutputValueClass(ObjectWritable.class); updater.setOutputKeyClass(Text.class); updater.setOutputValueClass(CrawlDatum.class); updater.setOutputFormat(MapFileOutputFormat.class); try { JobClient.runJob(updater); } catch (IOException e) { LOG.error(StringUtils.stringifyException(e)); // remove the temp crawldb on error if (fs.exists(newCrawlDb)) { fs.delete(newCrawlDb, true); } throw e; } // install the temp crawl database LOG.info("ScoreUpdater: installing new crawldb " + crawlDb); CrawlDb.install(updater, crawlDb); long end = System.currentTimeMillis(); LOG.info("ScoreUpdater: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end)); }