in src/java/org/apache/nutch/segment/SegmentMerger.java [442:621]
public void reduce(Text key, Iterable<MetaWrapper> values,
Context context) throws IOException, InterruptedException {
CrawlDatum lastG = null;
CrawlDatum lastF = null;
CrawlDatum lastSig = null;
Content lastC = null;
ParseData lastPD = null;
ParseText lastPT = null;
String lastGname = null;
String lastFname = null;
String lastSigname = null;
String lastCname = null;
String lastPDname = null;
String lastPTname = null;
TreeMap<String, ArrayList<CrawlDatum>> linked = new TreeMap<>();
for (MetaWrapper wrapper : values) {
Object o = wrapper.get();
String spString = wrapper.getMeta(SEGMENT_PART_KEY);
if (spString == null) {
throw new IOException("Null segment part, key=" + key);
}
SegmentPart sp = SegmentPart.parse(spString);
if (o instanceof CrawlDatum) {
CrawlDatum val = (CrawlDatum) o;
// check which output dir it belongs to
if (sp.partName.equals(CrawlDatum.GENERATE_DIR_NAME)) {
if (lastG == null) {
lastG = val;
lastGname = sp.segmentName;
} else {
// take newer
if (lastGname.compareTo(sp.segmentName) < 0) {
lastG = val;
lastGname = sp.segmentName;
}
}
} else if (sp.partName.equals(CrawlDatum.FETCH_DIR_NAME)) {
// only consider fetch status and ignore fetch retry status
// https://issues.apache.org/jira/browse/NUTCH-1520
// https://issues.apache.org/jira/browse/NUTCH-1113
if (CrawlDatum.hasFetchStatus(val)
&& val.getStatus() != CrawlDatum.STATUS_FETCH_RETRY
&& val.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
if (lastF == null) {
lastF = val;
lastFname = sp.segmentName;
} else {
if (lastFname.compareTo(sp.segmentName) < 0) {
lastF = val;
lastFname = sp.segmentName;
}
}
}
} else if (sp.partName.equals(CrawlDatum.PARSE_DIR_NAME)) {
if (val.getStatus() == CrawlDatum.STATUS_SIGNATURE) {
if (lastSig == null) {
lastSig = val;
lastSigname = sp.segmentName;
} else {
// take newer
if (lastSigname.compareTo(sp.segmentName) < 0) {
lastSig = val;
lastSigname = sp.segmentName;
}
}
continue;
}
// collect all LINKED values from the latest segment
ArrayList<CrawlDatum> segLinked = linked.get(sp.segmentName);
if (segLinked == null) {
segLinked = new ArrayList<>();
linked.put(sp.segmentName, segLinked);
}
segLinked.add(val);
} else {
throw new IOException("Cannot determine segment part: " + sp.partName);
}
} else if (o instanceof Content) {
if (lastC == null) {
lastC = (Content) o;
lastCname = sp.segmentName;
} else {
if (lastCname.compareTo(sp.segmentName) < 0) {
lastC = (Content) o;
lastCname = sp.segmentName;
}
}
} else if (o instanceof ParseData) {
if (lastPD == null) {
lastPD = (ParseData) o;
lastPDname = sp.segmentName;
} else {
if (lastPDname.compareTo(sp.segmentName) < 0) {
lastPD = (ParseData) o;
lastPDname = sp.segmentName;
}
}
} else if (o instanceof ParseText) {
if (lastPT == null) {
lastPT = (ParseText) o;
lastPTname = sp.segmentName;
} else {
if (lastPTname.compareTo(sp.segmentName) < 0) {
lastPT = (ParseText) o;
lastPTname = sp.segmentName;
}
}
}
}
// perform filtering based on full merge record
if (mergeFilters != null
&& !mergeFilters.filter(key, lastG, lastF, lastSig, lastC, lastPD,
lastPT, linked.isEmpty() ? null : linked.lastEntry().getValue())) {
return;
}
curCount++;
String sliceName;
MetaWrapper wrapper = new MetaWrapper();
if (sliceSize > 0) {
sliceName = String.valueOf(curCount / sliceSize);
wrapper.setMeta(SEGMENT_SLICE_KEY, sliceName);
}
SegmentPart sp = new SegmentPart();
// now output the latest values
if (lastG != null) {
wrapper.set(lastG);
sp.partName = CrawlDatum.GENERATE_DIR_NAME;
sp.segmentName = lastGname;
wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
context.write(key, wrapper);
}
if (lastF != null) {
wrapper.set(lastF);
sp.partName = CrawlDatum.FETCH_DIR_NAME;
sp.segmentName = lastFname;
wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
context.write(key, wrapper);
}
if (lastSig != null) {
wrapper.set(lastSig);
sp.partName = CrawlDatum.PARSE_DIR_NAME;
sp.segmentName = lastSigname;
wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
context.write(key, wrapper);
}
if (lastC != null) {
wrapper.set(lastC);
sp.partName = Content.DIR_NAME;
sp.segmentName = lastCname;
wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
context.write(key, wrapper);
}
if (lastPD != null) {
wrapper.set(lastPD);
sp.partName = ParseData.DIR_NAME;
sp.segmentName = lastPDname;
wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
context.write(key, wrapper);
}
if (lastPT != null) {
wrapper.set(lastPT);
sp.partName = ParseText.DIR_NAME;
sp.segmentName = lastPTname;
wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
context.write(key, wrapper);
}
if (linked.size() > 0) {
String name = linked.lastKey();
sp.partName = CrawlDatum.PARSE_DIR_NAME;
sp.segmentName = name;
wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
ArrayList<CrawlDatum> segLinked = linked.get(name);
for (int i = 0; i < segLinked.size(); i++) {
CrawlDatum link = segLinked.get(i);
wrapper.set(link);
context.write(key, wrapper);
}
}
}