public void merge()

in src/java/org/apache/nutch/segment/SegmentMerger.java [624:745]


  public void merge(Path out, Path[] segs, boolean filter, boolean normalize,
          long slice) throws IOException, ClassNotFoundException, InterruptedException {
    String segmentName = Generator.generateSegmentName();
    LOG.info("Merging {} segments to {}/{}", segs.length, out, segmentName);
    Job job = Job.getInstance(getConf(), "Nutch SegmentMerger: " + out + "/" + segmentName);
    Configuration conf = job.getConfiguration();
    conf.setBoolean("segment.merger.filter", filter);
    conf.setBoolean("segment.merger.normalizer", normalize);
    conf.setLong("segment.merger.slice", slice);
    conf.set("segment.merger.segmentName", segmentName);
    // prepare the minimal common set of input dirs
    boolean g = true;
    boolean f = true;
    boolean p = true;
    boolean c = true;
    boolean pd = true;
    boolean pt = true;

    // These contain previous values, we use it to track changes in the loop
    boolean pg = true;
    boolean pf = true;
    boolean pp = true;
    boolean pc = true;
    boolean ppd = true;
    boolean ppt = true;
    for (int i = 0; i < segs.length; i++) {
      FileSystem fs = segs[i].getFileSystem(conf);
      if (!fs.exists(segs[i])) {
        if (LOG.isWarnEnabled()) {
          LOG.warn("Input dir {} doesn't exist, skipping.", segs[i]);
        }
        segs[i] = null;
        continue;
      }
      LOG.info("SegmentMerger:   adding {}", segs[i]);
      Path cDir = new Path(segs[i], Content.DIR_NAME);
      Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME);
      Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME);
      Path pDir = new Path(segs[i], CrawlDatum.PARSE_DIR_NAME);
      Path pdDir = new Path(segs[i], ParseData.DIR_NAME);
      Path ptDir = new Path(segs[i], ParseText.DIR_NAME);
      c = c && fs.exists(cDir);
      g = g && fs.exists(gDir);
      f = f && fs.exists(fDir);
      p = p && fs.exists(pDir);
      pd = pd && fs.exists(pdDir);
      pt = pt && fs.exists(ptDir);

      // Input changed?
      if (g != pg || f != pf || p != pp || c != pc || pd != ppd || pt != ppt) {
        LOG.info("{} changed input dirs", segs[i]);
      }

      pg = g; pf = f; pp = p; pc = c; ppd = pd; ppt = pt;
    }
    if (LOG.isInfoEnabled()) {
      StringBuilder sb = new StringBuilder();
      if (c)
        sb.append(" " + Content.DIR_NAME);
      if (g)
        sb.append(" " + CrawlDatum.GENERATE_DIR_NAME);
      if (f)
        sb.append(" " + CrawlDatum.FETCH_DIR_NAME);
      if (p)
        sb.append(" " + CrawlDatum.PARSE_DIR_NAME);
      if (pd)
        sb.append(" " + ParseData.DIR_NAME);
      if (pt)
        sb.append(" " + ParseText.DIR_NAME);
      LOG.info("SegmentMerger: using segment data from: {}", sb.toString());
    }
    for (int i = 0; i < segs.length; i++) {
      if (segs[i] == null)
        continue;
      if (g) {
        Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME);
        FileInputFormat.addInputPath(job, gDir);
      }
      if (c) {
        Path cDir = new Path(segs[i], Content.DIR_NAME);
        FileInputFormat.addInputPath(job, cDir);
      }
      if (f) {
        Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME);
        FileInputFormat.addInputPath(job, fDir);
      }
      if (p) {
        Path pDir = new Path(segs[i], CrawlDatum.PARSE_DIR_NAME);
        FileInputFormat.addInputPath(job, pDir);
      }
      if (pd) {
        Path pdDir = new Path(segs[i], ParseData.DIR_NAME);
        FileInputFormat.addInputPath(job, pdDir);
      }
      if (pt) {
        Path ptDir = new Path(segs[i], ParseText.DIR_NAME);
        FileInputFormat.addInputPath(job, ptDir);
      }
    }
    job.setInputFormatClass(ObjectInputFormat.class);
    job.setJarByClass(SegmentMerger.class);
    job.setMapperClass(SegmentMerger.SegmentMergerMapper.class);
    job.setReducerClass(SegmentMerger.SegmentMergerReducer.class);
    FileOutputFormat.setOutputPath(job, out);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(MetaWrapper.class);
    job.setOutputFormatClass(SegmentOutputFormat.class);

    setConf(conf);

    try {
      boolean success = job.waitForCompletion(true);
      if (!success) {
        String message = NutchJob.getJobFailureLogMessage("SegmentMerger", job);
        LOG.error(message);
        throw new RuntimeException(message);
      }
    } catch (IOException | InterruptedException | ClassNotFoundException e) {
      LOG.error("SegmentMerger job failed: {}", e.getMessage());
      throw e;
    }
  }