in core/src/main/java/org/apache/stormcrawler/persistence/AbstractStatusUpdaterBolt.java [97:136]
public void prepare(
Map<String, Object> stormConf, TopologyContext context, OutputCollector collector) {
_collector = collector;
scheduler = Scheduler.getInstance(stormConf);
mdTransfer = MetadataTransfer.getInstance(stormConf);
useCache = ConfUtils.getBoolean(stormConf, useCacheParamName, true);
if (useCache) {
String spec = ConfUtils.getString(stormConf, cacheConfigParamName);
cache = Caffeine.from(spec).build();
context.registerMetric(
"cache",
new IMetric() {
@Override
public Object getValueAndReset() {
Map<String, Long> statsMap = new HashMap<>();
statsMap.put("hits", cacheHits);
statsMap.put("misses", cacheMisses);
statsMap.put("size", cache.estimatedSize());
cacheHits = 0;
cacheMisses = 0;
return statsMap;
}
},
30);
}
maxFetchErrors = ConfUtils.getInt(stormConf, maxFetchErrorsParamName, 3);
String tmpdateround = ConfUtils.getString(stormConf, roundDateParamName, "SECOND");
if (tmpdateround.equalsIgnoreCase("MINUTE")) {
roundDateUnit = Calendar.MINUTE;
} else if (tmpdateround.equalsIgnoreCase("HOUR")) {
roundDateUnit = Calendar.HOUR;
}
}