in src/java/org/apache/nutch/fetcher/FetcherThread.java [248:533]
public void run() {
activeThreads.incrementAndGet(); // count threads
FetchItem fit = null;
try {
// checking for the server to be running and fetcher.parse to be true
if (parsing && NutchServer.getInstance().isRunning())
reportToNutchServer = true;
while (true) {
// creating FetchNode for storing in FetchNodeDb
if (reportToNutchServer)
this.fetchNode = new FetchNode();
else
this.fetchNode = null;
// check whether must be stopped
if (isHalted()) {
LOG.debug("{} set to halted", getName());
fit = null;
return;
}
fit = fetchQueues.getFetchItem();
if (fit == null) {
if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) {
LOG.debug("{} spin-waiting ...", getName());
// spin-wait.
spinWaiting.incrementAndGet();
try {
Thread.sleep(500);
} catch (Exception e) {
}
spinWaiting.decrementAndGet();
continue;
} else {
// all done, finish this thread
LOG.info("{} {} has no more work available", getName(),
Thread.currentThread().getId());
return;
}
}
lastRequestStart.set(System.currentTimeMillis());
Text reprUrlWritable = (Text) fit.datum.getMetaData().get(
Nutch.WRITABLE_REPR_URL_KEY);
if (reprUrlWritable == null) {
setReprUrl(fit.url.toString());
} else {
setReprUrl(reprUrlWritable.toString());
}
try {
// fetch the page
redirecting = false;
redirectCount = 0;
//Publisher event
if(activatePublisher) {
FetcherThreadEvent startEvent = new FetcherThreadEvent(PublishEventType.START, fit.getUrl().toString());
publisher.publish(startEvent, conf);
}
do {
if (LOG.isInfoEnabled()) {
LOG.info("{} {} fetching {} (queue crawl delay={}ms)", getName(),
Thread.currentThread().getId(), fit.url,
fetchQueues.getFetchItemQueue(fit.queueID).crawlDelay);
}
LOG.debug("redirectCount={}", redirectCount);
redirecting = false;
Protocol protocol = this.protocolFactory.getProtocol(fit.u);
BaseRobotRules rules = protocol.getRobotRules(fit.url, fit.datum,
robotsTxtContent);
if (robotsTxtContent != null) {
outputRobotsTxt(robotsTxtContent);
robotsTxtContent.clear();
}
if (rules.isDeferVisits()) {
LOG.info("Defer visits for queue {} : {}", fit.queueID, fit.url);
// retry the fetch item
if (fetchQueues.timelimitExceeded()) {
fetchQueues.finishFetchItem(fit, true);
} else {
fetchQueues.addFetchItem(fit);
}
// but check whether it's time to cancel the queue
int killedURLs = fetchQueues.checkExceptionThreshold(
fit.getQueueID(), this.robotsDeferVisitsRetries + 1,
this.robotsDeferVisitsDelay);
if (killedURLs != 0) {
context
.getCounter("FetcherStatus", "robots_defer_visits_dropped")
.increment(killedURLs);
}
continue;
}
if (!rules.isAllowed(fit.url.toString())) {
// unblock
fetchQueues.finishFetchItem(fit, true);
LOG.info("Denied by robots.txt: {}", fit.url);
output(fit.url, fit.datum, null,
ProtocolStatus.STATUS_ROBOTS_DENIED,
CrawlDatum.STATUS_FETCH_GONE);
context.getCounter("FetcherStatus", "robots_denied").increment(1);
continue;
}
if (rules.getCrawlDelay() > 0) {
if (rules.getCrawlDelay() > maxCrawlDelay && maxCrawlDelay >= 0) {
// unblock
fetchQueues.finishFetchItem(fit, true);
LOG.info("Crawl-Delay for {} too long ({} ms), skipping",
fit.url, rules.getCrawlDelay());
output(fit.url, fit.datum, null,
ProtocolStatus.STATUS_ROBOTS_DENIED,
CrawlDatum.STATUS_FETCH_GONE);
context.getCounter("FetcherStatus",
"robots_denied_maxcrawldelay").increment(1);
continue;
} else {
FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID);
long crawlDelay = rules.getCrawlDelay();
if (crawlDelay < minCrawlDelay) {
LOG.info(
"Crawl-Delay for {} too short ({} ms), adjusting to {} ms",
fit.url, rules.getCrawlDelay(), minCrawlDelay);
crawlDelay = minCrawlDelay;
}
fiq.crawlDelay = crawlDelay;
LOG.debug(
"Crawl delay for queue: {} is set to {} as per robots.txt. url: ",
fit.queueID, fiq.crawlDelay, fit.url);
}
}
ProtocolOutput output = protocol.getProtocolOutput(fit.url,
fit.datum);
ProtocolStatus status = output.getStatus();
Content content = output.getContent();
ParseStatus pstatus = null;
// unblock queue
fetchQueues.finishFetchItem(fit);
// used for FetchNode
if (fetchNode != null) {
fetchNode.setStatus(status.getCode());
fetchNode.setFetchTime(System.currentTimeMillis());
fetchNode.setUrl(fit.url);
}
//Publish fetch finish event
if(activatePublisher) {
FetcherThreadEvent endEvent = new FetcherThreadEvent(PublishEventType.END, fit.getUrl().toString());
endEvent.addEventData("status", status.getName());
publisher.publish(endEvent, conf);
}
context.getCounter("FetcherStatus", status.getName()).increment(1);
switch (status.getCode()) {
case ProtocolStatus.SUCCESS: // got a page
pstatus = output(fit.url, fit.datum, content, status,
CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth);
updateStatus(content.getContent().length);
if (pstatus != null && pstatus.isSuccess()
&& pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
String newUrl = pstatus.getMessage();
int refreshTime = Integer.parseInt(pstatus.getArgs()[1]);
Text redirUrl = handleRedirect(fit, newUrl,
refreshTime < Fetcher.PERM_REFRESH_TIME,
Fetcher.CONTENT_REDIR);
if (redirUrl != null) {
fit = queueRedirect(redirUrl, fit);
}
}
break;
case ProtocolStatus.MOVED: // redirect
case ProtocolStatus.TEMP_MOVED:
int code;
boolean temp;
if (status.getCode() == ProtocolStatus.MOVED) {
code = CrawlDatum.STATUS_FETCH_REDIR_PERM;
temp = false;
} else {
code = CrawlDatum.STATUS_FETCH_REDIR_TEMP;
temp = true;
}
output(fit.url, fit.datum, content, status, code);
String newUrl = status.getMessage();
Text redirUrl = handleRedirect(fit, newUrl, temp,
Fetcher.PROTOCOL_REDIR);
if (redirUrl != null) {
fit = queueRedirect(redirUrl, fit);
} else {
// stop redirecting
redirecting = false;
}
break;
case ProtocolStatus.EXCEPTION:
logError(fit.url, status.getMessage());
int killedURLs = fetchQueues
.checkExceptionThreshold(fit.getQueueID());
if (killedURLs != 0)
context.getCounter("FetcherStatus",
"AboveExceptionThresholdInQueue").increment(killedURLs);
/* FALLTHROUGH */
case ProtocolStatus.RETRY: // retry
output(fit.url, fit.datum, null, status,
CrawlDatum.STATUS_FETCH_RETRY);
break;
case ProtocolStatus.GONE: // gone
case ProtocolStatus.NOTFOUND:
case ProtocolStatus.ACCESS_DENIED:
case ProtocolStatus.ROBOTS_DENIED:
output(fit.url, fit.datum, null, status,
CrawlDatum.STATUS_FETCH_GONE);
break;
case ProtocolStatus.NOTMODIFIED:
output(fit.url, fit.datum, null, status,
CrawlDatum.STATUS_FETCH_NOTMODIFIED);
break;
default:
if (LOG.isWarnEnabled()) {
LOG.warn("{} {} Unknown ProtocolStatus: {}", getName(),
Thread.currentThread().getId(), status.getCode());
}
output(fit.url, fit.datum, null, status,
CrawlDatum.STATUS_FETCH_RETRY);
}
if (redirecting && redirectCount > maxRedirect) {
fetchQueues.finishFetchItem(fit);
context.getCounter("FetcherStatus", "redirect_count_exceeded")
.increment(1);
if (LOG.isInfoEnabled()) {
LOG.info("{} {} - redirect count exceeded {} ({})", getName(),
Thread.currentThread().getId(), fit.url,
maxRedirectExceededSkip ? "skipped" : "linked");
}
if (maxRedirectExceededSkip) {
// skip redirect target when redirect count is exceeded
} else {
Text newUrl = new Text(status.getMessage());
CrawlDatum newDatum = createRedirDatum(newUrl, fit,
CrawlDatum.STATUS_LINKED);
output(newUrl, newDatum, null, null, CrawlDatum.STATUS_LINKED);
}
}
} while (redirecting && (redirectCount <= maxRedirect));
} catch (Throwable t) { // unexpected exception
// unblock
fetchQueues.finishFetchItem(fit);
String message;
if (LOG.isDebugEnabled()) {
message = StringUtils.stringifyException(t);
} else if (logUtil.logShort(t)) {
message = t.getClass().getName();
} else {
message = StringUtils.stringifyException(t);
}
logError(fit.url, message);
output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED,
CrawlDatum.STATUS_FETCH_RETRY);
}
}
} catch (Throwable e) {
if (LOG.isErrorEnabled()) {
LOG.error("fetcher caught:", e);
}
} finally {
if (fit != null) {
fetchQueues.finishFetchItem(fit);
}
activeThreads.decrementAndGet(); // count threads
LOG.info("{} {} -finishing thread {}, activeThreads={}", getName(),
Thread.currentThread().getId(), getName(), activeThreads);
}
}