in core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java [242:556]
public void execute(Tuple input) {
String urlString = input.getStringByField("url");
if (StringUtils.isBlank(urlString)) {
LOG.info("[Fetcher #{}] Missing value for field url in tuple {}", taskID, input);
// ignore silently
collector.ack(input);
return;
}
Metadata metadata = null;
if (input.contains("metadata")) metadata = (Metadata) input.getValueByField("metadata");
if (metadata == null) {
metadata = new Metadata();
}
// https://github.com/apache/incubator-stormcrawler/issues/813
metadata.remove("fetch.exception");
URL url;
try {
url = new URL(urlString);
} catch (MalformedURLException e) {
LOG.error("{} is a malformed URL", urlString);
// Report to status stream and ack
metadata.setValue(Constants.STATUS_ERROR_CAUSE, "malformed URL");
collector.emit(
org.apache.stormcrawler.Constants.StatusStreamName,
input,
new Values(urlString, metadata, Status.ERROR));
collector.ack(input);
return;
}
String key = getPolitenessKey(url);
long delay = 0;
try {
activeThreads.incrementAndGet();
Protocol protocol = protocolFactory.getProtocol(url);
BaseRobotRules rules = protocol.getRobotRules(urlString);
boolean fromCache = false;
if (rules instanceof RobotRules
&& ((RobotRules) rules).getContentLengthFetched().length == 0) {
fromCache = true;
eventCounter.scope("robots.fromCache").incrBy(1);
} else {
eventCounter.scope("robots.fetched").incrBy(1);
}
// autodiscovery of sitemaps
// the sitemaps will be sent down the topology
// if the robot file did not come from the cache
// to avoid sending them unnecessarily
// check in the metadata if discovery setting has been
// overridden
String localSitemapDiscoveryVal = metadata.getFirstValue(SITEMAP_DISCOVERY_PARAM_KEY);
boolean smautodisco;
if ("true".equalsIgnoreCase(localSitemapDiscoveryVal)) {
smautodisco = true;
} else if ("false".equalsIgnoreCase(localSitemapDiscoveryVal)) {
smautodisco = false;
} else {
smautodisco = sitemapsAutoDiscovery;
}
if (!fromCache && smautodisco) {
for (String sitemapURL : rules.getSitemaps()) {
if (rules.isAllowed(sitemapURL)) {
emitOutlink(
input,
url,
sitemapURL,
metadata,
SiteMapParserBolt.isSitemapKey,
"true");
}
}
}
// has found sitemaps
// https://github.com/apache/incubator-stormcrawler/issues/710
// note: we don't care if the sitemap URLs where actually
// kept
boolean foundSitemap = (rules.getSitemaps().size() > 0);
metadata.setValue(SiteMapParserBolt.foundSitemapKey, Boolean.toString(foundSitemap));
activeThreads.decrementAndGet();
if (!rules.isAllowed(urlString)) {
LOG.info("Denied by robots.txt: {}", urlString);
metadata.setValue(Constants.STATUS_ERROR_CAUSE, "robots.txt");
// Report to status stream and ack
collector.emit(
org.apache.stormcrawler.Constants.StatusStreamName,
input,
new Values(urlString, metadata, Status.ERROR));
collector.ack(input);
return;
}
// check when we are allowed to process it
long timeWaiting = 0;
Long timeAllowed = throttler.getIfPresent(key);
if (timeAllowed != null) {
long now = System.currentTimeMillis();
long timeToWait = timeAllowed - now;
if (timeToWait > 0) {
// too long -> send it to the back of the internal queue
if (maxThrottleSleepMSec != -1 && timeToWait > maxThrottleSleepMSec) {
collector.emitDirect(
this.taskID,
THROTTLE_STREAM,
input,
new Values(urlString, metadata));
collector.ack(input);
LOG.debug("[Fetcher #{}] sent back to the queue {}", taskID, urlString);
eventCounter.scope("sentBackToQueue").incrBy(1);
return;
}
// not too much of a wait - sleep here
timeWaiting = timeToWait;
try {
Thread.sleep(timeToWait);
} catch (InterruptedException e) {
LOG.error("[Fetcher #{}] caught InterruptedException caught while waiting");
Thread.currentThread().interrupt();
}
}
}
delay = this.crawlDelay;
// get the delay from robots
// value is negative when not set
long robotsDelay = rules.getCrawlDelay();
if (robotsDelay > 0) {
if (robotsDelay > maxCrawlDelay) {
if (maxCrawlDelayForce) {
// cap the value to a maximum
// as some sites specify ridiculous values
LOG.debug("Delay from robots capped at {} for {}", robotsDelay, url);
delay = maxCrawlDelay;
} else {
LOG.debug(
"Skipped URL from queue with overlong crawl-delay ({}): {}",
robotsDelay,
url);
metadata.setValue(Constants.STATUS_ERROR_CAUSE, "crawl_delay");
collector.emit(
org.apache.stormcrawler.Constants.StatusStreamName,
input,
new Values(urlString, metadata, Status.ERROR));
collector.ack(input);
return;
}
} else if (robotsDelay < crawlDelay && crawlDelayForce) {
LOG.debug(
"Crawl delay for {} too short ({}), set to fetcher.server.delay",
url,
robotsDelay);
delay = crawlDelay;
} else {
delay = robotsDelay;
}
}
LOG.debug("[Fetcher #{}] : Fetching {}", taskID, urlString);
activeThreads.incrementAndGet();
long start = System.currentTimeMillis();
ProtocolResponse response = protocol.getProtocolOutput(urlString, metadata);
long timeFetching = System.currentTimeMillis() - start;
final int byteLength = response.getContent().length;
// get any metrics from the protocol metadata
response.getMetadata().keySet("metrics.").stream()
.forEach(
s ->
averagedMetrics
.scope(s.substring(8))
.update(
Long.parseLong(
response.getMetadata()
.getFirstValue(s))));
averagedMetrics.scope("wait_time").update(timeWaiting);
averagedMetrics.scope("fetch_time").update(timeFetching);
averagedMetrics.scope("bytes_fetched").update(byteLength);
eventCounter.scope("fetched").incrBy(1);
eventCounter.scope("bytes_fetched").incrBy(byteLength);
perSecMetrics.scope("bytes_fetched_perSec").update(byteLength);
perSecMetrics.scope("fetched_perSec").update(1);
LOG.info(
"[Fetcher #{}] Fetched {} with status {} in {} after waiting {}",
taskID,
urlString,
response.getStatusCode(),
timeFetching,
timeWaiting);
Metadata mergedMD = new Metadata();
mergedMD.putAll(metadata);
// add a prefix to avoid confusion, preserve protocol metadata
// persisted or transferred from previous fetches
mergedMD.putAll(response.getMetadata(), protocolMDprefix);
mergedMD.setValue("fetch.statusCode", Integer.toString(response.getStatusCode()));
mergedMD.setValue("fetch.loadingTime", Long.toString(timeFetching));
mergedMD.setValue("fetch.byteLength", Integer.toString(byteLength));
// determine the status based on the status code
final Status status = Status.fromHTTPCode(response.getStatusCode());
eventCounter.scope("status_" + response.getStatusCode()).incrBy(1);
// used when sending to status stream
final Values values4status = new Values(urlString, mergedMD, status);
// if the status is OK emit on default stream
if (status.equals(Status.FETCHED)) {
if (response.getStatusCode() == 304) {
// mark this URL as fetched so that it gets
// rescheduled
// but do not try to parse or index
collector.emit(
org.apache.stormcrawler.Constants.StatusStreamName,
input,
values4status);
} else {
collector.emit(
Utils.DEFAULT_STREAM_ID,
input,
new Values(urlString, response.getContent(), mergedMD));
}
} else if (status.equals(Status.REDIRECTION)) {
// find the URL it redirects to
String redirection = response.getMetadata().getFirstValue(HttpHeaders.LOCATION);
// stores the URL it redirects to
// used for debugging mainly - do not resolve the target
// URL
if (StringUtils.isNotBlank(redirection)) {
mergedMD.setValue("_redirTo", redirection);
}
if (allowRedirs() && StringUtils.isNotBlank(redirection)) {
emitOutlink(input, url, redirection, mergedMD);
}
// Mark URL as redirected
collector.emit(
org.apache.stormcrawler.Constants.StatusStreamName, input, values4status);
} else {
// Error
collector.emit(
org.apache.stormcrawler.Constants.StatusStreamName, input, values4status);
}
} catch (Exception exece) {
String message = exece.getMessage();
if (message == null) message = "";
// common exceptions for which we log only a short message
if (exece.getCause() instanceof java.util.concurrent.TimeoutException
|| message.contains(" timed out")) {
LOG.error("Socket timeout fetching {}", urlString);
message = "Socket timeout fetching";
} else if (exece.getCause() instanceof java.net.UnknownHostException
|| exece instanceof java.net.UnknownHostException) {
LOG.error("Unknown host {}", urlString);
message = "Unknown host";
} else {
LOG.error("Exception while fetching {}", urlString, exece);
message = exece.getClass().getName();
}
eventCounter.scope("exception").incrBy(1);
// could be an empty, immutable Metadata
if (metadata.size() == 0) {
metadata = new Metadata();
}
// add the reason of the failure in the metadata
metadata.setValue("fetch.exception", message);
collector.emit(
org.apache.stormcrawler.Constants.StatusStreamName,
input,
new Values(urlString, metadata, Status.FETCH_ERROR));
}
activeThreads.decrementAndGet();
// update the throttler
throttler.put(key, System.currentTimeMillis() + delay);
collector.ack(input);
}