public void collectMetrics()

in src/main/java/org/opensearch/performanceanalyzer/collectors/ShardIndexingPressureMetricsCollector.java [97:275]


    public void collectMetrics(long startTime) {
        if (controller.isCollectorDisabled(configOverridesWrapper, getCollectorName())) {
            return;
        }

        long mCurrT = System.currentTimeMillis();
        try {
            ClusterService clusterService = OpenSearchResources.INSTANCE.getClusterService();
            if (clusterService != null) {
                Object indexingPressure =
                        getField(CLUSTER_SERVICE_CLASS_NAME, INDEXING_PRESSURE_FIELD_NAME)
                                .get(clusterService);
                if (indexingPressure != null) {
                    Object shardIndexingPressure =
                            getField(
                                            INDEXING_PRESSURE_CLASS_NAME,
                                            SHARD_INDEXING_PRESSURE_FIELD_NAME)
                                    .get(indexingPressure);
                    Object shardIndexingPressureStore =
                            getField(
                                            SHARD_INDEXING_PRESSURE_CLASS_NAME,
                                            SHARD_INDEXING_PRESSURE_STORE_FIELD_NAME)
                                    .get(shardIndexingPressure);
                    Map<Long, Object> shardIndexingPressureHotStore =
                            (Map<Long, Object>)
                                    getField(
                                                    SHARD_INDEXING_PRESSURE_STORE_CLASS_NAME,
                                                    SHARD_INDEXING_PRESSURE_HOT_STORE_FIELD_NAME)
                                            .get(shardIndexingPressureStore);

                    value.setLength(0);
                    shardIndexingPressureHotStore.entrySet().stream()
                            .limit(MAX_HOT_STORE_LIMIT)
                            .forEach(
                                    storeObject -> {
                                        try {
                                            JSONObject tracker =
                                                    (JSONObject)
                                                            parser.parse(
                                                                    mapper.writeValueAsString(
                                                                            storeObject
                                                                                    .getValue()));
                                            JSONObject shardId =
                                                    (JSONObject)
                                                            parser.parse(
                                                                    mapper.writeValueAsString(
                                                                            tracker.get(
                                                                                    "shardId")));
                                            value.append(
                                                            PerformanceAnalyzerMetrics
                                                                    .getJsonCurrentMilliSeconds())
                                                    .append(
                                                            PerformanceAnalyzerMetrics
                                                                    .sMetricNewLineDelimitor);
                                            value.append(
                                                            new ShardIndexingPressureStatus(
                                                                            AllMetrics.IndexingStage
                                                                                    .COORDINATING
                                                                                    .toString(),
                                                                            shardId.get("indexName")
                                                                                    .toString(),
                                                                            shardId.get("id")
                                                                                    .toString(),
                                                                            Long.parseLong(
                                                                                    tracker.get(
                                                                                                    "coordinatingRejections")
                                                                                            .toString()),
                                                                            Long.parseLong(
                                                                                    tracker.get(
                                                                                                    "currentCoordinatingBytes")
                                                                                            .toString()),
                                                                            Long.parseLong(
                                                                                    tracker.get(
                                                                                                    "primaryAndCoordinatingLimits")
                                                                                            .toString()),
                                                                            Double.longBitsToDouble(
                                                                                    Long.parseLong(
                                                                                            tracker.get(
                                                                                                            "coordinatingThroughputMovingAverage")
                                                                                                    .toString())),
                                                                            Long.parseLong(
                                                                                    tracker.get(
                                                                                                    "lastSuccessfulCoordinatingRequestTimestamp")
                                                                                            .toString()))
                                                                    .serialize())
                                                    .append(
                                                            PerformanceAnalyzerMetrics
                                                                    .sMetricNewLineDelimitor);
                                            value.append(
                                                            new ShardIndexingPressureStatus(
                                                                            AllMetrics.IndexingStage
                                                                                    .PRIMARY
                                                                                    .toString(),
                                                                            shardId.get("indexName")
                                                                                    .toString(),
                                                                            shardId.get("id")
                                                                                    .toString(),
                                                                            Long.parseLong(
                                                                                    tracker.get(
                                                                                                    "primaryRejections")
                                                                                            .toString()),
                                                                            Long.parseLong(
                                                                                    tracker.get(
                                                                                                    "currentPrimaryBytes")
                                                                                            .toString()),
                                                                            Long.parseLong(
                                                                                    tracker.get(
                                                                                                    "primaryAndCoordinatingLimits")
                                                                                            .toString()),
                                                                            Double.longBitsToDouble(
                                                                                    Long.parseLong(
                                                                                            tracker.get(
                                                                                                            "primaryThroughputMovingAverage")
                                                                                                    .toString())),
                                                                            Long.parseLong(
                                                                                    tracker.get(
                                                                                                    "lastSuccessfulPrimaryRequestTimestamp")
                                                                                            .toString()))
                                                                    .serialize())
                                                    .append(
                                                            PerformanceAnalyzerMetrics
                                                                    .sMetricNewLineDelimitor);
                                            value.append(
                                                            new ShardIndexingPressureStatus(
                                                                            AllMetrics.IndexingStage
                                                                                    .REPLICA
                                                                                    .toString(),
                                                                            shardId.get("indexName")
                                                                                    .toString(),
                                                                            shardId.get("id")
                                                                                    .toString(),
                                                                            Long.parseLong(
                                                                                    tracker.get(
                                                                                                    "replicaRejections")
                                                                                            .toString()),
                                                                            Long.parseLong(
                                                                                    tracker.get(
                                                                                                    "currentReplicaBytes")
                                                                                            .toString()),
                                                                            Long.parseLong(
                                                                                    tracker.get(
                                                                                                    "replicaLimits")
                                                                                            .toString()),
                                                                            Double.longBitsToDouble(
                                                                                    Long.parseLong(
                                                                                            tracker.get(
                                                                                                            "replicaThroughputMovingAverage")
                                                                                                    .toString())),
                                                                            Long.parseLong(
                                                                                    tracker.get(
                                                                                                    "lastSuccessfulReplicaRequestTimestamp")
                                                                                            .toString()))
                                                                    .serialize())
                                                    .append(
                                                            PerformanceAnalyzerMetrics
                                                                    .sMetricNewLineDelimitor);
                                        } catch (JsonProcessingException | ParseException e) {
                                            LOG.debug(
                                                    "Exception raised while parsing string to json object. Skipping IndexingPressureMetricsCollector");
                                        }
                                    });
                }
            }
            if (value.length() != 0) {
                saveMetricValues(value.toString(), startTime);
                PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
                        WriterMetrics.SHARD_INDEXING_PRESSURE_COLLECTOR_EXECUTION_TIME,
                        "",
                        System.currentTimeMillis() - mCurrT);
            }
        } catch (Exception ex) {
            PerformanceAnalyzerApp.ERRORS_AND_EXCEPTIONS_AGGREGATOR.updateStat(
                    ExceptionsAndErrors.SHARD_INDEXING_PRESSURE_COLLECTOR_ERROR, "", 1);
            LOG.debug(
                    "Exception in Collecting Shard Indexing Pressure Metrics: {} for startTime {}",
                    () -> ex.toString(),
                    () -> startTime);
        }
    }