in src/java/org/apache/nutch/scoring/webgraph/WebGraph.java [517:716]
public void createWebGraph(Path webGraphDb, Path[] segments,
boolean normalize, boolean filter) throws IOException,
InterruptedException, ClassNotFoundException {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
LOG.info("WebGraphDb: starting");
LOG.info("WebGraphDb: webgraphdb: " + webGraphDb);
LOG.info("WebGraphDb: URL normalize: " + normalize);
LOG.info("WebGraphDb: URL filter: " + filter);
FileSystem fs = webGraphDb.getFileSystem(getConf());
// lock an existing webgraphdb to prevent multiple simultaneous updates
Path lock = new Path(webGraphDb, LOCK_NAME);
if (!fs.exists(webGraphDb)) {
fs.mkdirs(webGraphDb);
}
LockUtil.createLockFile(fs, lock, false);
// outlink and temp outlink database paths
Path outlinkDb = new Path(webGraphDb, OUTLINK_DIR);
Path oldOutlinkDb = new Path(webGraphDb, OLD_OUTLINK_DIR);
if (!fs.exists(outlinkDb)) {
fs.mkdirs(outlinkDb);
}
Path tempOutlinkDb = new Path(outlinkDb + "-"
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
Job outlinkJob = Job.getInstance(getConf(), "Nutch WebGraph: outlinkdb " + outlinkDb);
Configuration outlinkJobConf = outlinkJob.getConfiguration();
boolean deleteGone = outlinkJobConf.getBoolean("link.delete.gone", false);
boolean preserveBackup = outlinkJobConf.getBoolean("db.preserve.backup", true);
if (deleteGone) {
LOG.info("OutlinkDb: deleting gone links");
}
// get the parse data and crawl fetch data for all segments
if (segments != null) {
for (int i = 0; i < segments.length; i++) {
FileSystem sfs = segments[i].getFileSystem(outlinkJobConf);
Path parseData = new Path(segments[i], ParseData.DIR_NAME);
if (sfs.exists(parseData)) {
LOG.info("OutlinkDb: adding input: " + parseData);
FileInputFormat.addInputPath(outlinkJob, parseData);
}
if (deleteGone) {
Path crawlFetch = new Path(segments[i], CrawlDatum.FETCH_DIR_NAME);
if (sfs.exists(crawlFetch)) {
LOG.info("OutlinkDb: adding input: " + crawlFetch);
FileInputFormat.addInputPath(outlinkJob, crawlFetch);
}
}
}
}
// add the existing webgraph
LOG.info("OutlinkDb: adding input: " + outlinkDb);
FileInputFormat.addInputPath(outlinkJob, outlinkDb);
outlinkJobConf.setBoolean(OutlinkDb.URL_NORMALIZING, normalize);
outlinkJobConf.setBoolean(OutlinkDb.URL_FILTERING, filter);
outlinkJob.setInputFormatClass(SequenceFileInputFormat.class);
outlinkJob.setJarByClass(OutlinkDb.class);
outlinkJob.setMapperClass(OutlinkDb.OutlinkDbMapper.class);
outlinkJob.setReducerClass(OutlinkDb.OutlinkDbReducer.class);
outlinkJob.setMapOutputKeyClass(Text.class);
outlinkJob.setMapOutputValueClass(NutchWritable.class);
outlinkJob.setOutputKeyClass(Text.class);
outlinkJob.setOutputValueClass(LinkDatum.class);
FileOutputFormat.setOutputPath(outlinkJob, tempOutlinkDb);
outlinkJob.setOutputFormatClass(MapFileOutputFormat.class);
outlinkJobConf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
false);
// run the outlinkdb job and replace any old outlinkdb with the new one
try {
LOG.info("OutlinkDb: running");
boolean success = outlinkJob.waitForCompletion(true);
if (!success) {
String message = NutchJob.getJobFailureLogMessage("OutlinkDb",
outlinkJob);
LOG.error(message);
NutchJob.cleanupAfterFailure(tempOutlinkDb, lock, fs);
throw new RuntimeException(message);
}
LOG.info("OutlinkDb: installing " + outlinkDb);
FSUtils.replace(fs, oldOutlinkDb, outlinkDb, true);
FSUtils.replace(fs, outlinkDb, tempOutlinkDb, true);
if (!preserveBackup && fs.exists(oldOutlinkDb))
fs.delete(oldOutlinkDb, true);
LOG.info("OutlinkDb: finished");
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("OutlinkDb failed:", e);
// remove lock file and and temporary directory if an error occurs
NutchJob.cleanupAfterFailure(tempOutlinkDb, lock, fs);
throw e;
}
// inlink and temp link database paths
Path inlinkDb = new Path(webGraphDb, INLINK_DIR);
Path tempInlinkDb = new Path(inlinkDb + "-"
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
Job inlinkJob = Job.getInstance(getConf(), "Nutch WebGraph: inlinkdb " + inlinkDb);
Configuration inlinkJobConf = inlinkJob.getConfiguration();
LOG.info("InlinkDb: adding input: " + outlinkDb);
FileInputFormat.addInputPath(inlinkJob, outlinkDb);
inlinkJob.setInputFormatClass(SequenceFileInputFormat.class);
inlinkJob.setJarByClass(InlinkDb.class);
inlinkJob.setMapperClass(InlinkDb.InlinkDbMapper.class);
inlinkJob.setMapOutputKeyClass(Text.class);
inlinkJob.setMapOutputValueClass(LinkDatum.class);
inlinkJob.setOutputKeyClass(Text.class);
inlinkJob.setOutputValueClass(LinkDatum.class);
FileOutputFormat.setOutputPath(inlinkJob, tempInlinkDb);
inlinkJob.setOutputFormatClass(MapFileOutputFormat.class);
inlinkJobConf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
false);
try {
// run the inlink and replace any old with new
LOG.info("InlinkDb: running");
boolean success = inlinkJob.waitForCompletion(true);
if (!success) {
String message = NutchJob.getJobFailureLogMessage("InlinkDb",
inlinkJob);
LOG.error(message);
NutchJob.cleanupAfterFailure(tempInlinkDb, lock, fs);
throw new RuntimeException(message);
}
LOG.info("InlinkDb: installing " + inlinkDb);
FSUtils.replace(fs, inlinkDb, tempInlinkDb, true);
LOG.info("InlinkDb: finished");
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("InlinkDb failed:", e);
// remove lock file and and temporary directory if an error occurs
NutchJob.cleanupAfterFailure(tempInlinkDb, lock, fs);
throw e;
}
// node and temp node database paths
Path nodeDb = new Path(webGraphDb, NODE_DIR);
Path tempNodeDb = new Path(nodeDb + "-"
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
Job nodeJob = Job.getInstance(getConf(), "Nutch WebGraph: nodedb " + nodeDb);
Configuration nodeJobConf = nodeJob.getConfiguration();
LOG.info("NodeDb: adding input: " + outlinkDb);
LOG.info("NodeDb: adding input: " + inlinkDb);
FileInputFormat.addInputPath(nodeJob, outlinkDb);
FileInputFormat.addInputPath(nodeJob, inlinkDb);
nodeJob.setInputFormatClass(SequenceFileInputFormat.class);
nodeJob.setJarByClass(NodeDb.class);
nodeJob.setReducerClass(NodeDb.NodeDbReducer.class);
nodeJob.setMapOutputKeyClass(Text.class);
nodeJob.setMapOutputValueClass(LinkDatum.class);
nodeJob.setOutputKeyClass(Text.class);
nodeJob.setOutputValueClass(Node.class);
FileOutputFormat.setOutputPath(nodeJob, tempNodeDb);
nodeJob.setOutputFormatClass(MapFileOutputFormat.class);
nodeJobConf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
false);
try {
// run the node job and replace old nodedb with new
LOG.info("NodeDb: running");
boolean success = nodeJob.waitForCompletion(true);
if (!success) {
String message = NutchJob.getJobFailureLogMessage("NodeDb", nodeJob);
LOG.error(message);
// remove lock file and and temporary directory if an error occurs
NutchJob.cleanupAfterFailure(tempNodeDb, lock, fs);
throw new RuntimeException(message);
}
LOG.info("NodeDb: installing " + nodeDb);
FSUtils.replace(fs, nodeDb, tempNodeDb, true);
LOG.info("NodeDb: finished");
} catch (IOException | InterruptedException | ClassNotFoundException e) {
LOG.error("NodeDb failed:", e);
// remove lock file and and temporary directory if an error occurs
NutchJob.cleanupAfterFailure(tempNodeDb, lock, fs);
throw e;
}
// remove the lock file for the webgraph
LockUtil.removeLockFile(fs, lock);
stopWatch.stop();
LOG.info("WebGraphDb: finished, elapsed: {} ms", stopWatch.getTime(
TimeUnit.MILLISECONDS));
}