in src/main/java/org/opensearch/performanceanalyzer/collectors/ScheduledMetricCollectorsExecutor.java [83:164]
public void run() {
Thread.currentThread().setName(this.getClass().getSimpleName());
if (metricsCollectorsTP == null) {
ThreadFactory taskThreadFactory =
new ThreadFactoryBuilder()
.setNameFormat(COLLECTOR_THREAD_POOL_NAME)
.setDaemon(true)
.build();
metricsCollectorsTP =
new ThreadPoolExecutor(
collectorThreadCount,
collectorThreadCount,
COLLECTOR_THREAD_KEEPALIVE_SECS,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(metricsCollectors.size()),
taskThreadFactory);
}
long prevStartTimestamp = System.currentTimeMillis();
while (true) {
try {
long millisToSleep =
minTimeIntervalToSleep - System.currentTimeMillis() + prevStartTimestamp;
if (millisToSleep > 0) {
Thread.sleep(millisToSleep);
}
} catch (Exception ex) {
LOG.error("Exception in Thread Sleep", ex);
}
prevStartTimestamp = System.currentTimeMillis();
if (getEnabled()) {
long currentTime = System.currentTimeMillis();
for (Map.Entry<PerformanceAnalyzerMetricsCollector, Long> entry :
metricsCollectors.entrySet()) {
if (entry.getValue() <= currentTime) {
PerformanceAnalyzerMetricsCollector collector = entry.getKey();
if (collector.getState()
== PerformanceAnalyzerMetricsCollector.State.MUTED) {
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.COLLECTORS_MUTED,
collector.getCollectorName(),
1);
continue;
}
metricsCollectors.put(
collector, entry.getValue() + collector.getTimeInterval());
if (!collector.inProgress()) {
collector.setStartTime(currentTime);
metricsCollectorsTP.execute(collector);
} else {
/**
* Always run StatsCollector; we rely on StatsCollector for framework
* service metrics
*/
if (collector
.getCollectorName()
.equals(StatsCollector.COLLECTOR_NAME)) {
LOG.info(
" {} is still in progress; skipping.",
StatsCollector.COLLECTOR_NAME);
return;
}
if (collector.getState()
== PerformanceAnalyzerMetricsCollector.State.HEALTHY) {
collector.setState(PerformanceAnalyzerMetricsCollector.State.SLOW);
} else if (collector.getState()
== PerformanceAnalyzerMetricsCollector.State.SLOW) {
collector.setState(PerformanceAnalyzerMetricsCollector.State.MUTED);
}
LOG.info(
"Collector {} is still in progress, so skipping this Interval",
collector.getCollectorName());
}
}
}
}
}
}