in core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java [923:989]
public void execute(Tuple input) {
if (TupleUtils.isTick(input)) {
// detect whether there is a file indicating that we should
// dump the content of the queues to the log
if (debugfiletrigger != null && debugfiletrigger.exists()) {
LOG.info("Found trigger file {}", debugfiletrigger);
logQueuesContent();
debugfiletrigger.delete();
}
return;
}
if (this.maxNumberURLsInQueues != -1) {
while (this.activeThreads.get() + this.fetchQueues.inQueues.get()
>= maxNumberURLsInQueues) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
LOG.error("Interrupted exception caught in execute method");
Thread.currentThread().interrupt();
}
LOG.debug(
"[Fetcher #{}] Threads : {}\tqueues : {}\tin_queues : {}",
taskID,
this.activeThreads.get(),
this.fetchQueues.queues.size(),
this.fetchQueues.inQueues.get());
}
}
final String urlString = input.getStringByField("url");
if (StringUtils.isBlank(urlString)) {
LOG.info("[Fetcher #{}] Missing value for field url in tuple {}", taskID, input);
// ignore silently
collector.ack(input);
return;
}
LOG.debug("Received in Fetcher {}", urlString);
URL url;
try {
url = new URL(urlString);
} catch (MalformedURLException e) {
LOG.error("{} is a malformed URL", urlString);
Metadata metadata = (Metadata) input.getValueByField("metadata");
if (metadata == null) {
metadata = new Metadata();
}
// Report to status stream and ack
metadata.setValue(Constants.STATUS_ERROR_CAUSE, "malformed URL");
collector.emit(
org.apache.stormcrawler.Constants.StatusStreamName,
input,
new Values(urlString, metadata, Status.ERROR));
collector.ack(input);
return;
}
boolean added = fetchQueues.addFetchItem(url, urlString, input);
if (!added) {
collector.fail(input);
}
}