in src/java/org/apache/nutch/fetcher/FetcherThread.java [661:889]
private ParseStatus output(Text key, CrawlDatum datum, Content content,
ProtocolStatus pstatus, int status, int outlinkDepth) throws InterruptedException{
datum.setStatus(status);
datum.setFetchTime(System.currentTimeMillis());
if (pstatus != null)
datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
ParseResult parseResult = null;
if (content != null) {
Metadata metadata = content.getMetadata();
// store the guessed content type in the crawldatum
if (content.getContentType() != null)
datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE),
new Text(content.getContentType()));
// add segment to metadata
metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName);
// add score to content metadata so that ParseSegment can pick it up.
try {
scfilters.passScoreBeforeParsing(key, datum, content);
} catch (Exception e) {
if (LOG.isWarnEnabled()) {
LOG.warn("{} {} Couldn't pass score, url {} ({})", getName(),
Thread.currentThread().getId(), key, e);
}
}
if (status == CrawlDatum.STATUS_FETCH_SUCCESS) {
if (parsing && !(skipTruncated && ParseSegment.isTruncated(content))) {
try {
parseResult = this.parseUtil.parse(content);
} catch (Exception e) {
LOG.warn("{} {} Error parsing: {}: {}", getName(),
Thread.currentThread().getId(), key,
StringUtils.stringifyException(e));
}
}
if (parseResult == null && (parsing || signatureWithoutParsing)) {
byte[] signature = SignatureFactory.getSignature(conf)
.calculate(content, new ParseStatus().getEmptyParse(conf));
datum.setSignature(signature);
}
}
/*
* Store status code in content So we can read this value during parsing
* (as a separate job) and decide to parse or not.
*/
content.getMetadata().add(Nutch.FETCH_STATUS_KEY,
Integer.toString(status));
}
try {
context.write(key, new NutchWritable(datum));
if (content != null && storingContent)
context.write(key, new NutchWritable(content));
if (parseResult != null) {
for (Entry<Text, Parse> entry : parseResult) {
Text url = entry.getKey();
Parse parse = entry.getValue();
ParseStatus parseStatus = parse.getData().getStatus();
ParseData parseData = parse.getData();
if (!parseStatus.isSuccess()) {
LOG.warn("{} {} Error parsing: {}: {}", getName(),
Thread.currentThread().getId(), key, parseStatus);
parse = parseStatus.getEmptyParse(conf);
}
// Calculate page signature. For non-parsing fetchers this will
// be done in ParseSegment
byte[] signature = SignatureFactory.getSignature(conf)
.calculate(content, parse);
// Ensure segment name and score are in parseData metadata
parseData.getContentMeta().set(Nutch.SEGMENT_NAME_KEY, segmentName);
parseData.getContentMeta().set(Nutch.SIGNATURE_KEY,
StringUtil.toHexString(signature));
// Pass fetch time to content meta
parseData.getContentMeta().set(Nutch.FETCH_TIME_KEY,
Long.toString(datum.getFetchTime()));
if (url.equals(key))
datum.setSignature(signature);
try {
scfilters.passScoreAfterParsing(url, content, parse);
} catch (Exception e) {
if (LOG.isWarnEnabled()) {
LOG.warn("{} {} Couldn't pass score, url {} ({})", getName(),
Thread.currentThread().getId(), key, e);
}
}
String origin = null;
// collect outlinks for subsequent db update
Outlink[] links = parseData.getOutlinks();
int outlinksToStore = Math.min(maxOutlinks, links.length);
if (ignoreExternalLinks || ignoreInternalLinks) {
URL originURL = new URL(url.toString());
// based on domain?
if ("bydomain".equalsIgnoreCase(ignoreExternalLinksMode)) {
origin = URLUtil.getDomainName(originURL).toLowerCase();
}
// use host
else {
origin = originURL.getHost().toLowerCase();
}
}
//used by fetchNode
if(fetchNode!=null){
fetchNode.setOutlinks(links);
fetchNode.setTitle(parseData.getTitle());
FetchNodeDb.getInstance().put(fetchNode.getUrl().toString(), fetchNode);
}
int validCount = 0;
// Process all outlinks, normalize, filter and deduplicate
List<Outlink> outlinkList = new ArrayList<>(outlinksToStore);
HashSet<String> outlinks = new HashSet<>(outlinksToStore);
for (int i = 0; i < links.length && validCount < outlinksToStore; i++) {
String toUrl = links[i].getToUrl();
if (toUrl.length() > maxOutlinkLength) {
continue;
}
toUrl = ParseOutputFormat.filterNormalize(url.toString(), toUrl,
origin, ignoreInternalLinks, ignoreExternalLinks,
ignoreExternalLinksMode, urlFiltersForOutlinks,
urlExemptionFilters, normalizersForOutlinks);
if (toUrl == null) {
continue;
}
validCount++;
links[i].setUrl(toUrl);
outlinkList.add(links[i]);
outlinks.add(toUrl);
}
//Publish fetch report event
if(activatePublisher) {
FetcherThreadEvent reportEvent = new FetcherThreadEvent(PublishEventType.REPORT, url.toString());
reportEvent.addOutlinksToEventData(outlinkList);
reportEvent.addEventData(Nutch.FETCH_EVENT_TITLE, parseData.getTitle());
reportEvent.addEventData(Nutch.FETCH_EVENT_CONTENTTYPE, parseData.getContentMeta().get("content-type"));
reportEvent.addEventData(Nutch.FETCH_EVENT_SCORE, datum.getScore());
reportEvent.addEventData(Nutch.FETCH_EVENT_FETCHTIME, datum.getFetchTime());
reportEvent.addEventData(Nutch.FETCH_EVENT_CONTENTLANG, parseData.getContentMeta().get("content-language"));
publisher.publish(reportEvent, conf);
}
// Only process depth N outlinks
if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth
&& !fetchQueues.timelimitExceeded()) {
FetchItem ft = FetchItem.create(url, null, queueMode);
FetchItemQueue queue = fetchQueues.getFetchItemQueue(ft.queueID);
queue.alreadyFetched.add(url.toString().hashCode());
context.getCounter("FetcherOutlinks", "outlinks_detected").increment(
outlinks.size());
// Counter to limit num outlinks to follow per page
int outlinkCounter = 0;
String followUrl;
// Walk over the outlinks and add as new FetchItem to the queues
Iterator<String> iter = outlinks.iterator();
while (iter.hasNext() && outlinkCounter < maxOutlinkDepthNumLinks) {
followUrl = iter.next();
// Check whether we'll follow external outlinks
if (outlinksIgnoreExternal) {
if (!URLUtil.getHost(url.toString()).equals(
URLUtil.getHost(followUrl))) {
continue;
}
}
// Already followed?
int urlHashCode = followUrl.hashCode();
if (queue.alreadyFetched.contains(urlHashCode)) {
continue;
}
queue.alreadyFetched.add(urlHashCode);
// Create new FetchItem with depth incremented
FetchItem fit = FetchItem.create(new Text(followUrl),
new CrawlDatum(CrawlDatum.STATUS_LINKED, interval),
queueMode, outlinkDepth + 1);
context.getCounter("FetcherOutlinks", "outlinks_following").increment(1);
fetchQueues.addFetchItem(fit);
outlinkCounter++;
}
}
// Overwrite the outlinks in ParseData with the normalized and
// filtered set
parseData.setOutlinks(outlinkList.toArray(new Outlink[outlinkList
.size()]));
context.write(url, new NutchWritable(new ParseImpl(new ParseText(
parse.getText()), parseData, parse.isCanonical())));
}
}
} catch (IOException e) {
if (LOG.isErrorEnabled()) {
LOG.error("fetcher caught:", e);
}
}
// return parse status (of the "original" URL if the ParseResult contains
// multiple parses) which allows Fetcher to follow meta-redirects
if (parseResult != null && !parseResult.isEmpty()) {
Parse p = parseResult.get(content.getUrl());
if (p != null) {
context.getCounter("ParserStatus", ParseStatus.majorCodes[p
.getData().getStatus().getMajorCode()]).increment(1);
return p.getData().getStatus();
}
}
return null;
}