in src/java/org/apache/nutch/fetcher/QueueFeeder.java [83:175]
public void run() {
boolean hasMore = true;
int cnt = 0;
int[] queuingStatus = new int[QueuingStatus.values().length];
while (hasMore) {
if (queues.timelimitExceeded() || queues.timoutReached) {
// enough ... lets' simply read all the entries from the input without
// processing them
if (queues.timoutReached) {
int qstatus = QueuingStatus.HIT_BY_TIMEOUT.ordinal();
if (queuingStatus[qstatus] == 0) {
LOG.info("QueueFeeder stopping, timeout reached.");
}
queuingStatus[qstatus]++;
context.getCounter("FetcherStatus", "hitByTimeout").increment(1);
} else {
int qstatus = QueuingStatus.HIT_BY_TIMELIMIT.ordinal();
if (queuingStatus[qstatus] == 0) {
LOG.info("QueueFeeder stopping, timelimit exceeded.");
}
queuingStatus[qstatus]++;
context.getCounter("FetcherStatus", "hitByTimeLimit").increment(1);
}
try {
hasMore = context.nextKeyValue();
} catch (IOException e) {
LOG.error("QueueFeeder error reading input, record " + cnt, e);
return;
} catch (InterruptedException e) {
LOG.info("QueueFeeder interrupted, exception:", e);
return;
}
continue;
}
int feed = size - queues.getTotalSize();
if (feed <= 0) {
// queues are full - spin-wait until they have some free space
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
continue;
}
LOG.debug("-feeding {} input urls ...", feed);
while (feed > 0 && hasMore) {
try {
hasMore = context.nextKeyValue();
if (hasMore) {
Text url = context.getCurrentKey();
if (urlFilters != null || urlNormalizers != null) {
String u = filterNormalize(url.toString());
if (u == null) {
// filtered or failed to normalize
context.getCounter("FetcherStatus", "filtered").increment(1);
continue;
}
url = new Text(u);
}
/*
* Need to copy key and value objects because MapReduce will reuse
* the original objects while the objects are stored in the queue.
*/
else {
url = new Text(url);
}
CrawlDatum datum = new CrawlDatum();
datum.set(context.getCurrentValue());
QueuingStatus status = queues.addFetchItem(url, datum);
queuingStatus[status.ordinal()]++;
if (status == QueuingStatus.ABOVE_EXCEPTION_THRESHOLD) {
context
.getCounter("FetcherStatus", "AboveExceptionThresholdInQueue")
.increment(1);
}
cnt++;
feed--;
}
} catch (IOException e) {
LOG.error("QueueFeeder error reading input, record " + cnt, e);
return;
} catch (InterruptedException e) {
LOG.info("QueueFeeder interrupted, exception:", e);
}
}
}
// signal queues that no more new fetch items are added
queues.feederAlive = false;
LOG.info("QueueFeeder finished: total {} records", cnt);
LOG.info("QueueFeeder queuing status:");
for (QueuingStatus status : QueuingStatus.values()) {
LOG.info("\t{}\t{}", queuingStatus[status.ordinal()], status);
}
}