public void reduce()

in src/java/org/apache/nutch/segment/SegmentMerger.java [442:621]


    public void reduce(Text key, Iterable<MetaWrapper> values,
            Context context) throws IOException, InterruptedException {
      CrawlDatum lastG = null;
      CrawlDatum lastF = null;
      CrawlDatum lastSig = null;
      Content lastC = null;
      ParseData lastPD = null;
      ParseText lastPT = null;
      String lastGname = null;
      String lastFname = null;
      String lastSigname = null;
      String lastCname = null;
      String lastPDname = null;
      String lastPTname = null;
      TreeMap<String, ArrayList<CrawlDatum>> linked = new TreeMap<>();
      for (MetaWrapper wrapper : values) {
        Object o = wrapper.get();
        String spString = wrapper.getMeta(SEGMENT_PART_KEY);
        if (spString == null) {
          throw new IOException("Null segment part, key=" + key);
        }
        SegmentPart sp = SegmentPart.parse(spString);
        if (o instanceof CrawlDatum) {
          CrawlDatum val = (CrawlDatum) o;
          // check which output dir it belongs to
          if (sp.partName.equals(CrawlDatum.GENERATE_DIR_NAME)) {
            if (lastG == null) {
              lastG = val;
              lastGname = sp.segmentName;
            } else {
              // take newer
              if (lastGname.compareTo(sp.segmentName) < 0) {
                lastG = val;
                lastGname = sp.segmentName;
              }
            }
          } else if (sp.partName.equals(CrawlDatum.FETCH_DIR_NAME)) {
            // only consider fetch status and ignore fetch retry status
            // https://issues.apache.org/jira/browse/NUTCH-1520
            // https://issues.apache.org/jira/browse/NUTCH-1113
            if (CrawlDatum.hasFetchStatus(val)
                    && val.getStatus() != CrawlDatum.STATUS_FETCH_RETRY
                    && val.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
              if (lastF == null) {
                lastF = val;
                lastFname = sp.segmentName;
              } else {
                if (lastFname.compareTo(sp.segmentName) < 0) {
                  lastF = val;
                  lastFname = sp.segmentName;
                }
              }
            }
          } else if (sp.partName.equals(CrawlDatum.PARSE_DIR_NAME)) {
            if (val.getStatus() == CrawlDatum.STATUS_SIGNATURE) {
              if (lastSig == null) {
                lastSig = val;
                lastSigname = sp.segmentName;
              } else {
                // take newer
                if (lastSigname.compareTo(sp.segmentName) < 0) {
                  lastSig = val;
                  lastSigname = sp.segmentName;
                }
              }
              continue;
            }
            // collect all LINKED values from the latest segment
            ArrayList<CrawlDatum> segLinked = linked.get(sp.segmentName);
            if (segLinked == null) {
              segLinked = new ArrayList<>();
              linked.put(sp.segmentName, segLinked);
            }
            segLinked.add(val);
          } else {
            throw new IOException("Cannot determine segment part: " + sp.partName);
          }
        } else if (o instanceof Content) {
          if (lastC == null) {
            lastC = (Content) o;
            lastCname = sp.segmentName;
          } else {
            if (lastCname.compareTo(sp.segmentName) < 0) {
              lastC = (Content) o;
              lastCname = sp.segmentName;
            }
          }
        } else if (o instanceof ParseData) {
          if (lastPD == null) {
            lastPD = (ParseData) o;
            lastPDname = sp.segmentName;
          } else {
            if (lastPDname.compareTo(sp.segmentName) < 0) {
              lastPD = (ParseData) o;
              lastPDname = sp.segmentName;
            }
          }
        } else if (o instanceof ParseText) {
          if (lastPT == null) {
            lastPT = (ParseText) o;
            lastPTname = sp.segmentName;
          } else {
            if (lastPTname.compareTo(sp.segmentName) < 0) {
              lastPT = (ParseText) o;
              lastPTname = sp.segmentName;
            }
          }
        }
      }
      // perform filtering based on full merge record
      if (mergeFilters != null
              && !mergeFilters.filter(key, lastG, lastF, lastSig, lastC, lastPD,
                      lastPT, linked.isEmpty() ? null : linked.lastEntry().getValue())) {
        return;
      }

      curCount++;
      String sliceName;
      MetaWrapper wrapper = new MetaWrapper();
      if (sliceSize > 0) {
        sliceName = String.valueOf(curCount / sliceSize);
        wrapper.setMeta(SEGMENT_SLICE_KEY, sliceName);
      }
      SegmentPart sp = new SegmentPart();
      // now output the latest values
      if (lastG != null) {
        wrapper.set(lastG);
        sp.partName = CrawlDatum.GENERATE_DIR_NAME;
        sp.segmentName = lastGname;
        wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
        context.write(key, wrapper);
      }
      if (lastF != null) {
        wrapper.set(lastF);
        sp.partName = CrawlDatum.FETCH_DIR_NAME;
        sp.segmentName = lastFname;
        wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
        context.write(key, wrapper);
      }
      if (lastSig != null) {
        wrapper.set(lastSig);
        sp.partName = CrawlDatum.PARSE_DIR_NAME;
        sp.segmentName = lastSigname;
        wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
        context.write(key, wrapper);
      }
      if (lastC != null) {
        wrapper.set(lastC);
        sp.partName = Content.DIR_NAME;
        sp.segmentName = lastCname;
        wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
        context.write(key, wrapper);
      }
      if (lastPD != null) {
        wrapper.set(lastPD);
        sp.partName = ParseData.DIR_NAME;
        sp.segmentName = lastPDname;
        wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
        context.write(key, wrapper);
      }
      if (lastPT != null) {
        wrapper.set(lastPT);
        sp.partName = ParseText.DIR_NAME;
        sp.segmentName = lastPTname;
        wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
        context.write(key, wrapper);
      }
      if (linked.size() > 0) {
        String name = linked.lastKey();
        sp.partName = CrawlDatum.PARSE_DIR_NAME;
        sp.segmentName = name;
        wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
        ArrayList<CrawlDatum> segLinked = linked.get(name);
        for (int i = 0; i < segLinked.size(); i++) {
          CrawlDatum link = segLinked.get(i);
          wrapper.set(link);
          context.write(key, wrapper);
        }
      }
    }