in src/java/org/apache/nutch/fetcher/Fetcher.java [196:479]
public void run(Context innerContext)
throws IOException, InterruptedException {
setup(innerContext);
try {
Configuration conf = innerContext.getConfiguration();
LinkedList<FetcherThread> fetcherThreads = new LinkedList<>();
FetchItemQueues fetchQueues = new FetchItemQueues(conf);
QueueFeeder feeder;
int threadCount = conf.getInt("fetcher.threads.fetch", 10);
LOG.info("Fetcher: threads: {}", threadCount);
// NUTCH-2582: adapt Tika MIME detector pool size to thread count
MimeUtil.setPoolSize(Math.max(10, threadCount / 2));
int timeoutDivisor = conf.getInt("fetcher.threads.timeout.divisor", 2);
LOG.info("Fetcher: time-out divisor: {}", timeoutDivisor);
int queueDepthMultiplier = conf.getInt("fetcher.queue.depth.multiplier",
50);
feeder = new QueueFeeder(innerContext, fetchQueues,
threadCount * queueDepthMultiplier);
feeder.start();
int startDelay = conf.getInt("fetcher.threads.start.delay", 10);
for (int i = 0; i < threadCount; i++) { // spawn threads
if (startDelay > 0 && i > 0) {
// short delay to avoid that DNS or other resources are temporarily
// exhausted by all threads fetching simultaneously the first pages
Thread.sleep(startDelay);
}
FetcherThread t = new FetcherThread(conf, getActiveThreads(),
fetchQueues, feeder, spinWaiting, lastRequestStart, innerContext,
errors, segmentName, parsing, storingContent, pages, bytes);
fetcherThreads.add(t);
t.start();
}
// select a timeout that avoids a task timeout
long timeout = conf.getInt("mapreduce.task.timeout", 10 * 60 * 1000)
/ timeoutDivisor;
// Used for threshold check, holds pages and bytes processed in the last
// second
int pagesLastSec;
int bytesLastSec;
int throughputThresholdNumRetries = 0;
int throughputThresholdPages = conf
.getInt("fetcher.throughput.threshold.pages", -1);
LOG.info("Fetcher: throughput threshold: {}", throughputThresholdPages);
int throughputThresholdMaxRetries = conf
.getInt("fetcher.throughput.threshold.retries", 5);
LOG.info("Fetcher: throughput threshold retries: {}",
throughputThresholdMaxRetries);
long throughputThresholdTimeLimit = conf
.getLong("fetcher.throughput.threshold.check.after", -1);
int targetBandwidth = conf.getInt("fetcher.bandwidth.target", -1)
* 1000;
int maxNumThreads = conf.getInt("fetcher.maxNum.threads", threadCount);
if (maxNumThreads < threadCount) {
LOG.info(
"fetcher.maxNum.threads can't be < than {} : using {} instead",
threadCount, threadCount);
maxNumThreads = threadCount;
}
int bandwidthTargetCheckEveryNSecs = conf
.getInt("fetcher.bandwidth.target.check.everyNSecs", 30);
if (bandwidthTargetCheckEveryNSecs < 1) {
LOG.info(
"fetcher.bandwidth.target.check.everyNSecs can't be < to 1 : using 1 instead");
bandwidthTargetCheckEveryNSecs = 1;
}
int maxThreadsPerQueue = conf.getInt("fetcher.threads.per.queue", 1);
int bandwidthTargetCheckCounter = 0;
long bytesAtLastBWTCheck = 0l;
do { // wait for threads to exit
pagesLastSec = pages.get();
bytesLastSec = (int) bytes.get();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
pagesLastSec = pages.get() - pagesLastSec;
bytesLastSec = (int) bytes.get() - bytesLastSec;
innerContext.getCounter("FetcherStatus", "bytes_downloaded")
.increment(bytesLastSec);
reportStatus(innerContext, fetchQueues, pagesLastSec, bytesLastSec);
LOG.info("-activeThreads=" + activeThreads + ", spinWaiting="
+ spinWaiting.get() + ", fetchQueues.totalSize="
+ fetchQueues.getTotalSize() + ", fetchQueues.getQueueCount="
+ fetchQueues.getQueueCount());
if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
fetchQueues.dump();
}
// if throughput threshold is enabled
if (throughputThresholdTimeLimit < System.currentTimeMillis()
&& throughputThresholdPages != -1) {
// Check if we're dropping below the threshold
if (pagesLastSec < throughputThresholdPages) {
throughputThresholdNumRetries++;
LOG.warn(
"{}: dropping below configured threshold of {} pages per second (current throughput: {} pages/sec.)",
throughputThresholdNumRetries, throughputThresholdPages,
pagesLastSec);
// Quit if we dropped below threshold too many times
if (throughputThresholdNumRetries == throughputThresholdMaxRetries) {
LOG.warn(
"Dropped below threshold {} times, dropping fetch queues to shut down",
throughputThresholdNumRetries);
// Disable the threshold checker
throughputThresholdPages = -1;
// Empty the queues cleanly and get number of items that were
// dropped
int hitByThrougputThreshold = fetchQueues.emptyQueues();
if (hitByThrougputThreshold != 0)
innerContext
.getCounter("FetcherStatus", "hitByThrougputThreshold")
.increment(hitByThrougputThreshold);
}
}
}
// adjust the number of threads if a target bandwidth has been set
if (targetBandwidth > 0) {
if (bandwidthTargetCheckCounter < bandwidthTargetCheckEveryNSecs)
bandwidthTargetCheckCounter++;
else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) {
long bpsSinceLastCheck = ((bytes.get() - bytesAtLastBWTCheck) * 8)
/ bandwidthTargetCheckEveryNSecs;
bytesAtLastBWTCheck = bytes.get();
bandwidthTargetCheckCounter = 0;
int averageBdwPerThread = 0;
if (activeThreads.get() > 0)
averageBdwPerThread = (int) (bpsSinceLastCheck
/ activeThreads.get());
LOG.info("averageBdwPerThread : {} kbps",
(averageBdwPerThread / 1000));
if (bpsSinceLastCheck < targetBandwidth
&& averageBdwPerThread > 0) {
// check whether it is worth doing e.g. more queues than threads
if ((fetchQueues.getQueueCount()
* maxThreadsPerQueue) > activeThreads.get()) {
long remainingBdw = targetBandwidth - bpsSinceLastCheck;
int additionalThreads = Math
.round(remainingBdw / averageBdwPerThread);
int availableThreads = maxNumThreads - activeThreads.get();
// determine the number of available threads (min between
// availableThreads and additionalThreads)
additionalThreads = (availableThreads < additionalThreads
? availableThreads
: additionalThreads);
LOG.info(
"Has space for more threads ({} vs {} kbps) \t=> adding {} new threads",
new Object[] { (bpsSinceLastCheck / 1000),
(targetBandwidth / 1000), additionalThreads });
// activate new threads
for (int i = 0; i < additionalThreads; i++) {
FetcherThread thread = new FetcherThread(conf,
getActiveThreads(), fetchQueues, feeder, spinWaiting,
lastRequestStart, innerContext, errors, segmentName,
parsing, storingContent, pages, bytes);
fetcherThreads.add(thread);
thread.start();
}
}
} else if (bpsSinceLastCheck > targetBandwidth
&& averageBdwPerThread > 0) {
// if the bandwidth we're using is greater then the expected
// bandwidth, we have to stop some threads
long excessBdw = bpsSinceLastCheck - targetBandwidth;
int excessThreads = Math.round(excessBdw / averageBdwPerThread);
LOG.info(
"Exceeding target bandwidth ({} vs {} kbps). \t=> excessThreads = {}",
new Object[] { bpsSinceLastCheck / 1000,
(targetBandwidth / 1000), excessThreads });
// keep at least one
if (excessThreads >= fetcherThreads.size())
excessThreads = 0;
// de-activates threads
for (int i = 0; i < excessThreads; i++) {
FetcherThread thread = fetcherThreads.removeLast();
thread.setHalted(true);
}
}
}
}
// check timelimit
if (!feeder.isAlive()) {
int hitByTimeLimit = fetchQueues.checkTimelimit();
if (hitByTimeLimit != 0)
innerContext.getCounter("FetcherStatus", "hitByTimeLimit")
.increment(hitByTimeLimit);
}
/*
* Some requests seem to hang, with no fetches finished and no new
* fetches started during half of the MapReduce task timeout
* (mapreduce.task.timeout, default value: 10 minutes). In order to
* avoid that the task timeout is hit and the fetcher job is failed,
* we stop the fetching now. See also the property
* fetcher.threads.timeout.divisor.
*/
if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
LOG.warn("Timeout reached with no new requests since {} seconds.",
timeout);
LOG.warn("Aborting with {} hung threads{}.", activeThreads,
feeder.isAlive() ? " (queue feeder still alive)" : "");
innerContext.getCounter("FetcherStatus", "hungThreads")
.increment(activeThreads.get());
for (int i = 0; i < fetcherThreads.size(); i++) {
FetcherThread thread = fetcherThreads.get(i);
if (thread.isAlive()) {
LOG.warn("Thread #{} hung while processing {}", i,
thread.getReprUrl());
StackTraceElement[] stack = thread.getStackTrace();
StringBuilder sb = new StringBuilder();
sb.append("Stack of thread #").append(i).append(":\n");
for (StackTraceElement s : stack) {
sb.append(s.toString()).append('\n');
}
LOG.warn(sb.toString());
}
}
/*
* signal the queue feeder that the timeout is reached and wait
* shortly for it to shut down
*/
fetchQueues.setTimeoutReached();
if (feeder.isAlive()) {
LOG.info(
"Signaled QueueFeeder to stop, waiting 1.5 seconds before exiting.");
Thread.sleep(1500);
}
/*
* log and count queued items dropped from the fetch queues because
* of the timeout
*/
LOG.warn("Aborting with {} queued fetch items in {} queues{}.",
fetchQueues.getTotalSize(), fetchQueues.getQueueCount(),
feeder.isAlive() ? " (queue feeder still alive)" : "");
int hitByTimeout = fetchQueues.emptyQueues();
innerContext.getCounter("FetcherStatus", "hitByTimeout")
.increment(hitByTimeout);
return;
}
} while (activeThreads.get() > 0);
LOG.info("-activeThreads={}", activeThreads);
} finally {
cleanup(innerContext);
}
}