in core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java [472:791]
public void run() {
while (true) {
FetchItem fit = fetchQueues.getFetchItem();
if (fit == null) {
LOG.trace("{} spin-waiting ...", getName());
// spin-wait.
spinWaiting.incrementAndGet();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.error("{} caught interrupted exception", getName());
Thread.currentThread().interrupt();
}
spinWaiting.decrementAndGet();
continue;
}
activeThreads.incrementAndGet(); // count threads
beingFetched[threadNum] = fit.url;
LOG.debug(
"[Fetcher #{}] {} => activeThreads={}, spinWaiting={}, queueID={}",
taskID,
getName(),
activeThreads,
spinWaiting,
fit.queueID);
LOG.debug("[Fetcher #{}] {} : Fetching {}", taskID, getName(), fit.url);
Metadata metadata = null;
if (fit.t.contains("metadata")) {
metadata = (Metadata) fit.t.getValueByField("metadata");
}
if (metadata == null) {
metadata = new Metadata();
}
// https://github.com/apache/incubator-stormcrawler/issues/813
metadata.remove("fetch.exception");
boolean asap = false;
try {
URL url = new URL(fit.url);
Protocol protocol = protocolFactory.getProtocol(url);
if (protocol == null)
throw new RuntimeException(
"No protocol implementation found for " + fit.url);
BaseRobotRules rules = protocol.getRobotRules(fit.url);
boolean fromCache = false;
if (rules instanceof RobotRules
&& ((RobotRules) rules).getContentLengthFetched().length == 0) {
fromCache = true;
eventCounter.scope("robots.fromCache").incrBy(1);
} else {
eventCounter.scope("robots.fetched").incrBy(1);
}
// autodiscovery of sitemaps
// the sitemaps will be sent down the topology
// if the robot file did not come from the cache
// to avoid sending them unnecessarily
// check in the metadata if discovery setting has been
// overridden
String localSitemapDiscoveryVal =
metadata.getFirstValue(SITEMAP_DISCOVERY_PARAM_KEY);
boolean smautodisco;
if ("true".equalsIgnoreCase(localSitemapDiscoveryVal)) {
smautodisco = true;
} else if ("false".equalsIgnoreCase(localSitemapDiscoveryVal)) {
smautodisco = false;
} else {
smautodisco = sitemapsAutoDiscovery;
}
if (!fromCache && smautodisco) {
for (String sitemapURL : rules.getSitemaps()) {
if (rules.isAllowed(sitemapURL)) {
emitOutlink(
fit.t,
url,
sitemapURL,
metadata,
SiteMapParserBolt.isSitemapKey,
"true");
}
}
}
// has found sitemaps
// https://github.com/apache/incubator-stormcrawler/issues/710
// note: we don't care if the sitemap URLs where actually
// kept
boolean foundSitemap = (rules.getSitemaps().size() > 0);
metadata.setValue(
SiteMapParserBolt.foundSitemapKey, Boolean.toString(foundSitemap));
if (!rules.isAllowed(fit.url)) {
LOG.info("Denied by robots.txt: {}", fit.url);
// pass the info about denied by robots
metadata.setValue(Constants.STATUS_ERROR_CAUSE, "robots.txt");
collector.emit(
org.apache.stormcrawler.Constants.StatusStreamName,
fit.t,
new Values(fit.url, metadata, Status.ERROR));
// no need to wait next time as we won't request from
// that site
asap = true;
continue;
}
FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID, metadata);
if (rules.getCrawlDelay() > 0 && rules.getCrawlDelay() != fiq.crawlDelay) {
if (rules.getCrawlDelay() > maxCrawlDelay && maxCrawlDelay >= 0) {
boolean force = false;
String msg = "skipping";
if (maxCrawlDelayForce) {
force = true;
msg = "using value of fetcher.max.crawl.delay instead";
}
LOG.info(
"Crawl-Delay for {} too long ({}), {}",
fit.url,
rules.getCrawlDelay(),
msg);
if (force) {
fiq.crawlDelay = maxCrawlDelay;
} else {
// pass the info about crawl delay
metadata.setValue(Constants.STATUS_ERROR_CAUSE, "crawl_delay");
collector.emit(
org.apache.stormcrawler.Constants.StatusStreamName,
fit.t,
new Values(fit.url, metadata, Status.ERROR));
// no need to wait next time as we won't request
// from that site
asap = true;
continue;
}
} else if (rules.getCrawlDelay() < fetchQueues.crawlDelay
&& crawlDelayForce) {
fiq.crawlDelay = fetchQueues.crawlDelay;
LOG.info(
"Crawl delay for {} too short ({}), set to fetcher.server.delay",
fit.url,
rules.getCrawlDelay());
} else {
fiq.crawlDelay = rules.getCrawlDelay();
LOG.info(
"Crawl delay for queue: {} is set to {} as per robots.txt. url: {}",
fit.queueID,
fiq.crawlDelay,
fit.url);
}
}
long start = System.currentTimeMillis();
long timeInQueues = start - fit.creationTime;
// been in the queue far too long and already failed
// by the timeout - let's not fetch it
if (timeoutInQueues != -1 && timeInQueues > timeoutInQueues * 1000) {
LOG.info(
"[Fetcher #{}] Waited in queue for too long - {}", taskID, fit.url);
// no need to wait next time as we won't request from
// that site
asap = true;
continue;
}
ProtocolResponse response = protocol.getProtocolOutput(fit.url, metadata);
long timeFetching = System.currentTimeMillis() - start;
final int byteLength = response.getContent().length;
// get any metrics from the protocol metadata
// expect Longs
response.getMetadata().keySet("metrics.").stream()
.forEach(
s ->
averagedMetrics
.scope(s.substring(8))
.update(
Long.parseLong(
response.getMetadata()
.getFirstValue(s))));
averagedMetrics.scope("fetch_time").update(timeFetching);
averagedMetrics.scope("time_in_queues").update(timeInQueues);
averagedMetrics.scope("bytes_fetched").update(byteLength);
perSecMetrics.scope("bytes_fetched_perSec").update(byteLength);
perSecMetrics.scope("fetched_perSec").update(1);
eventCounter.scope("fetched").incrBy(1);
eventCounter.scope("bytes_fetched").incrBy(byteLength);
LOG.info(
"[Fetcher #{}] Fetched {} with status {} in msec {}",
taskID,
fit.url,
response.getStatusCode(),
timeFetching);
// merges the original MD and the ones returned by the
// protocol
Metadata mergedMD = new Metadata();
mergedMD.putAll(metadata);
// add a prefix to avoid confusion, preserve protocol
// metadata persisted or transferred from previous fetches
mergedMD.putAll(response.getMetadata(), protocolMDprefix);
mergedMD.setValue(
"fetch.statusCode", Integer.toString(response.getStatusCode()));
mergedMD.setValue("fetch.byteLength", Integer.toString(byteLength));
mergedMD.setValue("fetch.loadingTime", Long.toString(timeFetching));
mergedMD.setValue("fetch.timeInQueues", Long.toString(timeInQueues));
// determine the status based on the status code
final Status status = Status.fromHTTPCode(response.getStatusCode());
eventCounter.scope("status_" + response.getStatusCode()).incrBy(1);
final Values tupleToSend = new Values(fit.url, mergedMD, status);
// if the status is OK emit on default stream
if (status.equals(Status.FETCHED)) {
if (response.getStatusCode() == 304) {
// mark this URL as fetched so that it gets
// rescheduled
// but do not try to parse or index
collector.emit(Constants.StatusStreamName, fit.t, tupleToSend);
} else {
// send content for parsing
collector.emit(
Utils.DEFAULT_STREAM_ID,
fit.t,
new Values(fit.url, response.getContent(), mergedMD));
}
} else if (status.equals(Status.REDIRECTION)) {
// find the URL it redirects to
String redirection =
response.getMetadata().getFirstValue(HttpHeaders.LOCATION);
// stores the URL it redirects to
// used for debugging mainly - do not resolve the target
// URL
if (StringUtils.isNotBlank(redirection)) {
mergedMD.setValue("_redirTo", redirection);
}
// https://github.com/apache/incubator-stormcrawler/issues/954
if (allowRedirs() && StringUtils.isNotBlank(redirection)) {
emitOutlink(fit.t, url, redirection, mergedMD);
}
// mark this URL as redirected
collector.emit(Constants.StatusStreamName, fit.t, tupleToSend);
}
// error
else {
collector.emit(Constants.StatusStreamName, fit.t, tupleToSend);
}
} catch (Exception exece) {
String message = exece.getMessage();
if (message == null) message = "";
// common exceptions for which we log only a short message
if (exece.getCause() instanceof java.util.concurrent.TimeoutException
|| message.contains(" timed out")) {
LOG.info("Socket timeout fetching {}", fit.url);
message = "Socket timeout fetching";
} else if (exece.getCause() instanceof java.net.UnknownHostException
|| exece instanceof java.net.UnknownHostException) {
LOG.info("Unknown host {}", fit.url);
message = "Unknown host";
} else {
message = exece.getClass().getName();
if (LOG.isDebugEnabled()) {
LOG.debug("Exception while fetching {}", fit.url, exece);
} else {
LOG.info("Exception while fetching {} -> {}", fit.url, message);
}
}
if (metadata.size() == 0) {
metadata = new Metadata();
}
// add the reason of the failure in the metadata
metadata.setValue("fetch.exception", message);
// send to status stream
collector.emit(
Constants.StatusStreamName,
fit.t,
new Values(fit.url, metadata, Status.FETCH_ERROR));
eventCounter.scope("exception").incrBy(1);
} finally {
fetchQueues.finishFetchItem(fit, asap);
activeThreads.decrementAndGet(); // count threads
// ack it whatever happens
collector.ack(fit.t);
beingFetched[threadNum] = "";
}
}
}