in core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java [808:909]
public void prepare(
Map<String, Object> stormConf, TopologyContext context, OutputCollector collector) {
super.prepare(stormConf, context, collector);
Config conf = new Config();
conf.putAll(stormConf);
checkConfiguration(conf);
LOG.info("[Fetcher #{}] : starting at {}", taskID, Instant.now());
int metricsTimeBucketSecs = ConfUtils.getInt(conf, "fetcher.metrics.time.bucket.secs", 10);
// Register a "MultiCountMetric" to count different events in this bolt
// Storm will emit the counts every n seconds to a special bolt via a
// system stream
// The data can be accessed by registering a "MetricConsumer" in the
// topology
this.eventCounter =
context.registerMetric(
"fetcher_counter", new MultiCountMetric(), metricsTimeBucketSecs);
// create gauges
context.registerMetric(
"activethreads",
() -> {
return activeThreads.get();
},
metricsTimeBucketSecs);
context.registerMetric(
"in_queues",
() -> {
return fetchQueues.inQueues.get();
},
metricsTimeBucketSecs);
context.registerMetric(
"num_queues",
() -> {
return fetchQueues.queues.size();
},
metricsTimeBucketSecs);
this.averagedMetrics =
context.registerMetric(
"fetcher_average_perdoc",
new MultiReducedMetric(new MeanReducer()),
metricsTimeBucketSecs);
this.perSecMetrics =
context.registerMetric(
"fetcher_average_persec",
new MultiReducedMetric(new PerSecondReducer()),
metricsTimeBucketSecs);
protocolFactory = ProtocolFactory.getInstance(conf);
this.fetchQueues = new FetchItemQueues(conf);
this.taskID = context.getThisTaskId();
int threadCount = ConfUtils.getInt(conf, "fetcher.threads.number", 10);
int startDelay = ConfUtils.getInt(conf, "fetcher.threads.start.delay", 10);
for (int i = 0; i < threadCount; i++) {
if (startDelay > 0 && i > 0) {
// short delay to avoid that DNS or other resources are temporarily
// exhausted by all threads fetching simultaneously the first pages
try {
Thread.sleep(startDelay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
new FetcherThread(conf, i).start();
}
// keep track of the URLs in fetching
beingFetched = new String[threadCount];
Arrays.fill(beingFetched, "");
sitemapsAutoDiscovery = ConfUtils.getBoolean(stormConf, SITEMAP_DISCOVERY_PARAM_KEY, false);
maxNumberURLsInQueues = ConfUtils.getInt(conf, "fetcher.max.urls.in.queues", -1);
/*
* If set to a valid path e.g. /tmp/fetcher-dump-{port} on a worker node, the content of the
* queues will be dumped to the logs for debugging. The port number needs to match the one
* used by the FetcherBolt instance.
*/
String debugfiletriggerpattern =
ConfUtils.getString(conf, "fetcherbolt.queue.debug.filepath");
if (StringUtils.isNotBlank(debugfiletriggerpattern)) {
debugfiletrigger =
new File(
debugfiletriggerpattern.replaceAll(
"\\{port\\}", Integer.toString(context.getThisWorkerPort())));
}
}