in core/src/main/java/org/apache/stormcrawler/bolt/SimpleFetcherBolt.java [141:227]
public void prepare(
Map<String, Object> stormConf, TopologyContext context, OutputCollector collector) {
super.prepare(stormConf, context, collector);
this.conf = new Config();
this.conf.putAll(stormConf);
checkConfiguration();
this.taskID = context.getThisTaskId();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ENGLISH);
long start = System.currentTimeMillis();
LOG.info("[Fetcher #{}] : starting at {}", taskID, sdf.format(start));
// Register a "MultiCountMetric" to count different events in this bolt
// Storm will emit the counts every n seconds to a special bolt via a
// system stream
// The data can be accessed by registering a "MetricConsumer" in the
// topology
int metricsTimeBucketSecs = ConfUtils.getInt(conf, "fetcher.metrics.time.bucket.secs", 10);
this.eventCounter =
context.registerMetric(
"fetcher_counter", new MultiCountMetric(), metricsTimeBucketSecs);
this.averagedMetrics =
context.registerMetric(
"fetcher_average",
new MultiReducedMetric(new MeanReducer()),
metricsTimeBucketSecs);
this.perSecMetrics =
context.registerMetric(
"fetcher_average_persec",
new MultiReducedMetric(new PerSecondReducer()),
metricsTimeBucketSecs);
// create gauges
context.registerMetric(
"activethreads",
new IMetric() {
@Override
public Object getValueAndReset() {
return activeThreads.get();
}
},
metricsTimeBucketSecs);
context.registerMetric(
"throttler_size",
new IMetric() {
@Override
public Object getValueAndReset() {
return throttler.estimatedSize();
}
},
metricsTimeBucketSecs);
protocolFactory = ProtocolFactory.getInstance(conf);
sitemapsAutoDiscovery = ConfUtils.getBoolean(stormConf, SITEMAP_DISCOVERY_PARAM_KEY, false);
queueMode = ConfUtils.getString(conf, "fetcher.queue.mode", QUEUE_MODE_HOST);
// check that the mode is known
if (!queueMode.equals(QUEUE_MODE_IP)
&& !queueMode.equals(QUEUE_MODE_DOMAIN)
&& !queueMode.equals(QUEUE_MODE_HOST)) {
LOG.error("Unknown partition mode : {} - forcing to byHost", queueMode);
queueMode = QUEUE_MODE_HOST;
}
LOG.info("Using queue mode : {}", queueMode);
this.crawlDelay = (long) (ConfUtils.getFloat(conf, "fetcher.server.delay", 1.0f) * 1000);
this.maxCrawlDelay = (long) ConfUtils.getInt(conf, "fetcher.max.crawl.delay", 30) * 1000;
this.maxCrawlDelayForce =
ConfUtils.getBoolean(conf, "fetcher.max.crawl.delay.force", false);
this.crawlDelayForce = ConfUtils.getBoolean(conf, "fetcher.server.delay.force", false);
this.maxThrottleSleepMSec = ConfUtils.getLong(conf, "fetcher.max.throttle.sleep", -1);
this.protocolMDprefix =
ConfUtils.getString(
conf, ProtocolResponse.PROTOCOL_MD_PREFIX_PARAM, protocolMDprefix);
}