in src/main/java/org/opensearch/performanceanalyzer/collectors/ThreadPoolMetricsCollector.java [66:142]
public void collectMetrics(long startTime) {
if (OpenSearchResources.INSTANCE.getThreadPool() == null) {
return;
}
Iterator<Stats> statsIterator =
OpenSearchResources.INSTANCE.getThreadPool().stats().iterator();
value.setLength(0);
value.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds());
while (statsIterator.hasNext()) {
Stats stats = statsIterator.next();
long rejectionDelta = 0;
String threadPoolName = stats.getName();
if (statsRecordMap.containsKey(threadPoolName)) {
ThreadPoolStatsRecord lastRecord = statsRecordMap.get(threadPoolName);
// if the timestamp in previous record is greater than 15s (3 * intervals),
// then the scheduler might hang or freeze due to long GC etc. We simply drop
// previous record here and set rejectionDelta to 0.
if (startTime - lastRecord.getTimestamp() <= SAMPLING_TIME_INTERVAL * 3) {
rejectionDelta = stats.getRejected() - lastRecord.getRejected();
// we might not run into this as rejection is a LongAdder which never decrement
// its count.
// regardless, let's set it to 0 to be safe.
if (rejectionDelta < 0) {
rejectionDelta = 0;
}
}
}
statsRecordMap.put(
threadPoolName, new ThreadPoolStatsRecord(startTime, stats.getRejected()));
final long finalRejectionDelta = rejectionDelta;
final int capacity =
AccessController.doPrivileged(
(PrivilegedAction<Integer>)
() -> {
try {
ThreadPool threadPool =
(ThreadPool)
FieldUtils.readField(
OpenSearchResources.INSTANCE
.getIndicesService(),
"threadPool",
true);
ThreadPoolExecutor threadPoolExecutor =
(ThreadPoolExecutor)
threadPool.executor(threadPoolName);
Object queue = threadPoolExecutor.getQueue();
// TODO: we might want to read the capacity of
// SifiResizableBlockingQueue in the future.
// In order to do that we can create a new
// PerformanceAnalyzerLibrary package and push
// all the code which depends on core OpenSearch
// specific
// changes into that library.
if (queue instanceof SizeBlockingQueue) {
return ((SizeBlockingQueue) queue).capacity();
}
} catch (Exception e) {
LOG.warn("Fail to read queue capacity via reflection");
}
return -1;
});
ThreadPoolStatus threadPoolStatus =
new ThreadPoolStatus(
stats.getName(),
stats.getQueue(),
finalRejectionDelta,
stats.getThreads(),
stats.getActive(),
-1.0,
capacity);
value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
.append(threadPoolStatus.serialize());
}
saveMetricValues(value.toString(), startTime);
}