public void createWebGraph()

in src/java/org/apache/nutch/scoring/webgraph/WebGraph.java [517:716]


  public void createWebGraph(Path webGraphDb, Path[] segments,
      boolean normalize, boolean filter) throws IOException, 
      InterruptedException, ClassNotFoundException {

    StopWatch stopWatch = new StopWatch();
    stopWatch.start();
    LOG.info("WebGraphDb: starting");
    LOG.info("WebGraphDb: webgraphdb: " + webGraphDb);
    LOG.info("WebGraphDb: URL normalize: " + normalize);
    LOG.info("WebGraphDb: URL filter: " + filter);

    FileSystem fs = webGraphDb.getFileSystem(getConf());

    // lock an existing webgraphdb to prevent multiple simultaneous updates
    Path lock = new Path(webGraphDb, LOCK_NAME);
    if (!fs.exists(webGraphDb)) {
      fs.mkdirs(webGraphDb);
    }

    LockUtil.createLockFile(fs, lock, false);

    // outlink and temp outlink database paths
    Path outlinkDb = new Path(webGraphDb, OUTLINK_DIR);
    Path oldOutlinkDb = new Path(webGraphDb, OLD_OUTLINK_DIR);

    if (!fs.exists(outlinkDb)) {
      fs.mkdirs(outlinkDb);
    }

    Path tempOutlinkDb = new Path(outlinkDb + "-"
        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
    Job outlinkJob = Job.getInstance(getConf(), "Nutch WebGraph: outlinkdb " + outlinkDb);
    Configuration outlinkJobConf = outlinkJob.getConfiguration();

    boolean deleteGone = outlinkJobConf.getBoolean("link.delete.gone", false);
    boolean preserveBackup = outlinkJobConf.getBoolean("db.preserve.backup", true);

    if (deleteGone) {
      LOG.info("OutlinkDb: deleting gone links");
    }

    // get the parse data and crawl fetch data for all segments
    if (segments != null) {
      for (int i = 0; i < segments.length; i++) {
        FileSystem sfs = segments[i].getFileSystem(outlinkJobConf);
        Path parseData = new Path(segments[i], ParseData.DIR_NAME);
        if (sfs.exists(parseData)) {
          LOG.info("OutlinkDb: adding input: " + parseData);
          FileInputFormat.addInputPath(outlinkJob, parseData);
        }

        if (deleteGone) {
          Path crawlFetch = new Path(segments[i], CrawlDatum.FETCH_DIR_NAME);
          if (sfs.exists(crawlFetch)) {
            LOG.info("OutlinkDb: adding input: " + crawlFetch);
            FileInputFormat.addInputPath(outlinkJob, crawlFetch);
          }
        }
      }
    }

    // add the existing webgraph
    LOG.info("OutlinkDb: adding input: " + outlinkDb);
    FileInputFormat.addInputPath(outlinkJob, outlinkDb);

    outlinkJobConf.setBoolean(OutlinkDb.URL_NORMALIZING, normalize);
    outlinkJobConf.setBoolean(OutlinkDb.URL_FILTERING, filter);

    outlinkJob.setInputFormatClass(SequenceFileInputFormat.class);
    outlinkJob.setJarByClass(OutlinkDb.class);
    outlinkJob.setMapperClass(OutlinkDb.OutlinkDbMapper.class);
    outlinkJob.setReducerClass(OutlinkDb.OutlinkDbReducer.class);
    outlinkJob.setMapOutputKeyClass(Text.class);
    outlinkJob.setMapOutputValueClass(NutchWritable.class);
    outlinkJob.setOutputKeyClass(Text.class);
    outlinkJob.setOutputValueClass(LinkDatum.class);
    FileOutputFormat.setOutputPath(outlinkJob, tempOutlinkDb);
    outlinkJob.setOutputFormatClass(MapFileOutputFormat.class);
    outlinkJobConf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
        false);

    // run the outlinkdb job and replace any old outlinkdb with the new one
    try {
      LOG.info("OutlinkDb: running");
      boolean success = outlinkJob.waitForCompletion(true);
      if (!success) {
        String message = NutchJob.getJobFailureLogMessage("OutlinkDb",
            outlinkJob);
        LOG.error(message);
        NutchJob.cleanupAfterFailure(tempOutlinkDb, lock, fs);
        throw new RuntimeException(message);
      }
      LOG.info("OutlinkDb: installing " + outlinkDb);
      FSUtils.replace(fs, oldOutlinkDb, outlinkDb, true);
      FSUtils.replace(fs, outlinkDb, tempOutlinkDb, true);
      if (!preserveBackup && fs.exists(oldOutlinkDb))
        fs.delete(oldOutlinkDb, true);
      LOG.info("OutlinkDb: finished");
    } catch (IOException | InterruptedException | ClassNotFoundException e) {
      LOG.error("OutlinkDb failed:", e);
      // remove lock file and and temporary directory if an error occurs
      NutchJob.cleanupAfterFailure(tempOutlinkDb, lock, fs);
      throw e;
    }

    // inlink and temp link database paths
    Path inlinkDb = new Path(webGraphDb, INLINK_DIR);
    Path tempInlinkDb = new Path(inlinkDb + "-"
        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

    Job inlinkJob = Job.getInstance(getConf(), "Nutch WebGraph: inlinkdb " + inlinkDb);
    Configuration inlinkJobConf = inlinkJob.getConfiguration();
    LOG.info("InlinkDb: adding input: " + outlinkDb);
    FileInputFormat.addInputPath(inlinkJob, outlinkDb);
    inlinkJob.setInputFormatClass(SequenceFileInputFormat.class);
    inlinkJob.setJarByClass(InlinkDb.class);
    inlinkJob.setMapperClass(InlinkDb.InlinkDbMapper.class);
    inlinkJob.setMapOutputKeyClass(Text.class);
    inlinkJob.setMapOutputValueClass(LinkDatum.class);
    inlinkJob.setOutputKeyClass(Text.class);
    inlinkJob.setOutputValueClass(LinkDatum.class);
    FileOutputFormat.setOutputPath(inlinkJob, tempInlinkDb);
    inlinkJob.setOutputFormatClass(MapFileOutputFormat.class);
    inlinkJobConf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
        false);

    try {

      // run the inlink and replace any old with new
      LOG.info("InlinkDb: running");
      boolean success = inlinkJob.waitForCompletion(true);
      if (!success) {
        String message = NutchJob.getJobFailureLogMessage("InlinkDb",
            inlinkJob);
        LOG.error(message);
        NutchJob.cleanupAfterFailure(tempInlinkDb, lock, fs);
        throw new RuntimeException(message);
      }
      LOG.info("InlinkDb: installing " + inlinkDb);
      FSUtils.replace(fs, inlinkDb, tempInlinkDb, true);
      LOG.info("InlinkDb: finished");
    } catch (IOException | InterruptedException | ClassNotFoundException e) {
      LOG.error("InlinkDb failed:", e);
      // remove lock file and and temporary directory if an error occurs
      NutchJob.cleanupAfterFailure(tempInlinkDb, lock, fs);
      throw e;
    }

    // node and temp node database paths
    Path nodeDb = new Path(webGraphDb, NODE_DIR);
    Path tempNodeDb = new Path(nodeDb + "-"
        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

    Job nodeJob = Job.getInstance(getConf(), "Nutch WebGraph: nodedb " + nodeDb);
    Configuration nodeJobConf = nodeJob.getConfiguration();
    LOG.info("NodeDb: adding input: " + outlinkDb);
    LOG.info("NodeDb: adding input: " + inlinkDb);
    FileInputFormat.addInputPath(nodeJob, outlinkDb);
    FileInputFormat.addInputPath(nodeJob, inlinkDb);
    nodeJob.setInputFormatClass(SequenceFileInputFormat.class);
    nodeJob.setJarByClass(NodeDb.class);
    nodeJob.setReducerClass(NodeDb.NodeDbReducer.class);
    nodeJob.setMapOutputKeyClass(Text.class);
    nodeJob.setMapOutputValueClass(LinkDatum.class);
    nodeJob.setOutputKeyClass(Text.class);
    nodeJob.setOutputValueClass(Node.class);
    FileOutputFormat.setOutputPath(nodeJob, tempNodeDb);
    nodeJob.setOutputFormatClass(MapFileOutputFormat.class);
    nodeJobConf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
        false);

    try {

      // run the node job and replace old nodedb with new
      LOG.info("NodeDb: running");
      boolean success = nodeJob.waitForCompletion(true);
      if (!success) {
        String message = NutchJob.getJobFailureLogMessage("NodeDb", nodeJob);
        LOG.error(message);
        // remove lock file and and temporary directory if an error occurs
        NutchJob.cleanupAfterFailure(tempNodeDb, lock, fs);
        throw new RuntimeException(message);
      }
      LOG.info("NodeDb: installing " + nodeDb);
      FSUtils.replace(fs, nodeDb, tempNodeDb, true);
      LOG.info("NodeDb: finished");
    } catch (IOException | InterruptedException | ClassNotFoundException e) {
      LOG.error("NodeDb failed:", e);
      // remove lock file and and temporary directory if an error occurs
      NutchJob.cleanupAfterFailure(tempNodeDb, lock, fs);
      throw e;
    }

    // remove the lock file for the webgraph
    LockUtil.removeLockFile(fs, lock);

    stopWatch.stop();
    LOG.info("WebGraphDb: finished, elapsed: {} ms", stopWatch.getTime(
        TimeUnit.MILLISECONDS));
  }