in src/java/org/apache/nutch/segment/SegmentMerger.java [624:745]
public void merge(Path out, Path[] segs, boolean filter, boolean normalize,
long slice) throws IOException, ClassNotFoundException, InterruptedException {
String segmentName = Generator.generateSegmentName();
LOG.info("Merging {} segments to {}/{}", segs.length, out, segmentName);
Job job = Job.getInstance(getConf(), "Nutch SegmentMerger: " + out + "/" + segmentName);
Configuration conf = job.getConfiguration();
conf.setBoolean("segment.merger.filter", filter);
conf.setBoolean("segment.merger.normalizer", normalize);
conf.setLong("segment.merger.slice", slice);
conf.set("segment.merger.segmentName", segmentName);
// prepare the minimal common set of input dirs
boolean g = true;
boolean f = true;
boolean p = true;
boolean c = true;
boolean pd = true;
boolean pt = true;
// These contain previous values, we use it to track changes in the loop
boolean pg = true;
boolean pf = true;
boolean pp = true;
boolean pc = true;
boolean ppd = true;
boolean ppt = true;
for (int i = 0; i < segs.length; i++) {
FileSystem fs = segs[i].getFileSystem(conf);
if (!fs.exists(segs[i])) {
if (LOG.isWarnEnabled()) {
LOG.warn("Input dir {} doesn't exist, skipping.", segs[i]);
}
segs[i] = null;
continue;
}
LOG.info("SegmentMerger: adding {}", segs[i]);
Path cDir = new Path(segs[i], Content.DIR_NAME);
Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME);
Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME);
Path pDir = new Path(segs[i], CrawlDatum.PARSE_DIR_NAME);
Path pdDir = new Path(segs[i], ParseData.DIR_NAME);
Path ptDir = new Path(segs[i], ParseText.DIR_NAME);
c = c && fs.exists(cDir);
g = g && fs.exists(gDir);
f = f && fs.exists(fDir);
p = p && fs.exists(pDir);
pd = pd && fs.exists(pdDir);
pt = pt && fs.exists(ptDir);
// Input changed?
if (g != pg || f != pf || p != pp || c != pc || pd != ppd || pt != ppt) {
LOG.info("{} changed input dirs", segs[i]);
}
pg = g; pf = f; pp = p; pc = c; ppd = pd; ppt = pt;
}
if (LOG.isInfoEnabled()) {
StringBuilder sb = new StringBuilder();
if (c)
sb.append(" " + Content.DIR_NAME);
if (g)
sb.append(" " + CrawlDatum.GENERATE_DIR_NAME);
if (f)
sb.append(" " + CrawlDatum.FETCH_DIR_NAME);
if (p)
sb.append(" " + CrawlDatum.PARSE_DIR_NAME);
if (pd)
sb.append(" " + ParseData.DIR_NAME);
if (pt)
sb.append(" " + ParseText.DIR_NAME);
LOG.info("SegmentMerger: using segment data from: {}", sb.toString());
}
for (int i = 0; i < segs.length; i++) {
if (segs[i] == null)
continue;
if (g) {
Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME);
FileInputFormat.addInputPath(job, gDir);
}
if (c) {
Path cDir = new Path(segs[i], Content.DIR_NAME);
FileInputFormat.addInputPath(job, cDir);
}
if (f) {
Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME);
FileInputFormat.addInputPath(job, fDir);
}
if (p) {
Path pDir = new Path(segs[i], CrawlDatum.PARSE_DIR_NAME);
FileInputFormat.addInputPath(job, pDir);
}
if (pd) {
Path pdDir = new Path(segs[i], ParseData.DIR_NAME);
FileInputFormat.addInputPath(job, pdDir);
}
if (pt) {
Path ptDir = new Path(segs[i], ParseText.DIR_NAME);
FileInputFormat.addInputPath(job, ptDir);
}
}
job.setInputFormatClass(ObjectInputFormat.class);
job.setJarByClass(SegmentMerger.class);
job.setMapperClass(SegmentMerger.SegmentMergerMapper.class);
job.setReducerClass(SegmentMerger.SegmentMergerReducer.class);
FileOutputFormat.setOutputPath(job, out);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MetaWrapper.class);
job.setOutputFormatClass(SegmentOutputFormat.class);
setConf(conf);
try {
boolean success = job.waitForCompletion(true);
if (!success) {
String message = NutchJob.getJobFailureLogMessage("SegmentMerger", job);
LOG.error(message);
throw new RuntimeException(message);
}
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("SegmentMerger job failed: {}", e.getMessage());
throw e;
}
}