public void collectMetrics()

in src/main/java/org/opensearch/performanceanalyzer/collectors/ThreadPoolMetricsCollector.java [66:142]


    public void collectMetrics(long startTime) {
        if (OpenSearchResources.INSTANCE.getThreadPool() == null) {
            return;
        }

        Iterator<Stats> statsIterator =
                OpenSearchResources.INSTANCE.getThreadPool().stats().iterator();
        value.setLength(0);
        value.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds());

        while (statsIterator.hasNext()) {
            Stats stats = statsIterator.next();
            long rejectionDelta = 0;
            String threadPoolName = stats.getName();
            if (statsRecordMap.containsKey(threadPoolName)) {
                ThreadPoolStatsRecord lastRecord = statsRecordMap.get(threadPoolName);
                // if the timestamp in previous record is greater than 15s (3 * intervals),
                // then the scheduler might hang or freeze due to long GC etc. We simply drop
                // previous record here and set rejectionDelta to 0.
                if (startTime - lastRecord.getTimestamp() <= SAMPLING_TIME_INTERVAL * 3) {
                    rejectionDelta = stats.getRejected() - lastRecord.getRejected();
                    // we might not run into this as rejection is a LongAdder which never decrement
                    // its count.
                    // regardless, let's set it to 0 to be safe.
                    if (rejectionDelta < 0) {
                        rejectionDelta = 0;
                    }
                }
            }
            statsRecordMap.put(
                    threadPoolName, new ThreadPoolStatsRecord(startTime, stats.getRejected()));
            final long finalRejectionDelta = rejectionDelta;
            final int capacity =
                    AccessController.doPrivileged(
                            (PrivilegedAction<Integer>)
                                    () -> {
                                        try {
                                            ThreadPool threadPool =
                                                    (ThreadPool)
                                                            FieldUtils.readField(
                                                                    OpenSearchResources.INSTANCE
                                                                            .getIndicesService(),
                                                                    "threadPool",
                                                                    true);
                                            ThreadPoolExecutor threadPoolExecutor =
                                                    (ThreadPoolExecutor)
                                                            threadPool.executor(threadPoolName);
                                            Object queue = threadPoolExecutor.getQueue();
                                            // TODO: we might want to read the capacity of
                                            // SifiResizableBlockingQueue in the future.
                                            // In order to do that we can create a new
                                            // PerformanceAnalyzerLibrary package and push
                                            // all the code which depends on core OpenSearch
                                            // specific
                                            // changes into that library.
                                            if (queue instanceof SizeBlockingQueue) {
                                                return ((SizeBlockingQueue) queue).capacity();
                                            }
                                        } catch (Exception e) {
                                            LOG.warn("Fail to read queue capacity via reflection");
                                        }
                                        return -1;
                                    });
            ThreadPoolStatus threadPoolStatus =
                    new ThreadPoolStatus(
                            stats.getName(),
                            stats.getQueue(),
                            finalRejectionDelta,
                            stats.getThreads(),
                            stats.getActive(),
                            -1.0,
                            capacity);
            value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
                    .append(threadPoolStatus.serialize());
        }
        saveMetricValues(value.toString(), startTime);
    }