public int run()

in src/java/org/apache/nutch/crawl/DeduplicationJob.java [276:389]


  public int run(String[] args) throws IOException {
    if (args.length < 1) {
      System.err.println("Usage: DeduplicationJob <crawldb> [-group <none|host|domain>] [-compareOrder <score>,<fetchTime>,<httpsOverHttp>,<urlLength>]");
      return 1;
    }

    String group = "none";
    Path crawlDb = new Path(args[0]);
    String compareOrder = "score,fetchTime,urlLength";

    for (int i = 1; i < args.length; i++) {
      if (args[i].equals("-group")) 
        group = args[++i];
      if (args[i].equals("-compareOrder")) {
        compareOrder = args[++i];

        if (compareOrder.indexOf("score") == -1 ||
            compareOrder.indexOf("fetchTime") == -1 ||
            compareOrder.indexOf("urlLength") == -1) {
          System.err.println("DeduplicationJob: compareOrder must contain score, fetchTime and urlLength.");
          return 1;
        }
      }
    }

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    long start = System.currentTimeMillis();
    LOG.info("DeduplicationJob: starting at " + sdf.format(start));

    Path tempDir = new Path(crawlDb, "dedup-temp-"
        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));

    Job job = NutchJob.getInstance(getConf());
    Configuration conf = job.getConfiguration();
    job.setJobName("Deduplication on " + crawlDb);
    conf.set(DEDUPLICATION_GROUP_MODE, group);
    conf.set(DEDUPLICATION_COMPARE_ORDER, compareOrder);
    job.setJarByClass(DeduplicationJob.class);

    FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
    job.setInputFormatClass(SequenceFileInputFormat.class);

    FileOutputFormat.setOutputPath(job, tempDir);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);

    job.setMapOutputKeyClass(BytesWritable.class);
    job.setMapOutputValueClass(CrawlDatum.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(CrawlDatum.class);

    job.setMapperClass(DBFilter.class);
    job.setReducerClass(DedupReducer.class);

    FileSystem fs = tempDir.getFileSystem(getConf());
    try {
      boolean success = job.waitForCompletion(true);
      if (!success) {
        String message = NutchJob.getJobFailureLogMessage("Crawl", job);
        LOG.error(message);
        fs.delete(tempDir, true);
        throw new RuntimeException(message);
      }
      CounterGroup g = job.getCounters().getGroup("DeduplicationJobStatus");
      if (g != null) {
        Counter counter = g.findCounter("Documents marked as duplicate");
        long dups = counter.getValue();
        LOG.info("Deduplication: " + (int) dups
            + " documents marked as duplicates");
      }
    } catch (IOException | InterruptedException | ClassNotFoundException e) {
      LOG.error("DeduplicationJob: " + StringUtils.stringifyException(e));
      fs.delete(tempDir, true);
      return -1;
    }

    // merge with existing crawl db
    LOG.info("Deduplication: Updating status of duplicate urls into crawl db.");

    Job mergeJob = CrawlDb.createJob(getConf(), crawlDb);
    FileInputFormat.addInputPath(mergeJob, tempDir);
    mergeJob.setReducerClass(StatusUpdateReducer.class);
    mergeJob.setJarByClass(DeduplicationJob.class);

    fs = crawlDb.getFileSystem(getConf());
    Path outPath = FileOutputFormat.getOutputPath(job);
    Path lock = CrawlDb.lock(getConf(), crawlDb, false);
    try {
      boolean success = mergeJob.waitForCompletion(true);
      if (!success) {
        String message = NutchJob.getJobFailureLogMessage("Crawl", mergeJob);
        LOG.error(message);
        fs.delete(tempDir, true);
        NutchJob.cleanupAfterFailure(outPath, lock, fs);
        throw new RuntimeException(message);
      }
    } catch (IOException | InterruptedException | ClassNotFoundException e) {
      LOG.error("DeduplicationMergeJob: " + StringUtils.stringifyException(e));
      fs.delete(tempDir, true);
      NutchJob.cleanupAfterFailure(outPath, lock, fs);
      return -1;
    }

    CrawlDb.install(mergeJob, crawlDb);

    // clean up
    fs.delete(tempDir, true);

    long end = System.currentTimeMillis();
    LOG.info("Deduplication finished at " + sdf.format(end) + ", elapsed: "
        + TimingUtil.elapsedTime(start, end));

    return 0;
  }