in src/main/java/org/opensearch/performanceanalyzer/collectors/ShardIndexingPressureMetricsCollector.java [97:275]
public void collectMetrics(long startTime) {
if (controller.isCollectorDisabled(configOverridesWrapper, getCollectorName())) {
return;
}
long mCurrT = System.currentTimeMillis();
try {
ClusterService clusterService = OpenSearchResources.INSTANCE.getClusterService();
if (clusterService != null) {
Object indexingPressure =
getField(CLUSTER_SERVICE_CLASS_NAME, INDEXING_PRESSURE_FIELD_NAME)
.get(clusterService);
if (indexingPressure != null) {
Object shardIndexingPressure =
getField(
INDEXING_PRESSURE_CLASS_NAME,
SHARD_INDEXING_PRESSURE_FIELD_NAME)
.get(indexingPressure);
Object shardIndexingPressureStore =
getField(
SHARD_INDEXING_PRESSURE_CLASS_NAME,
SHARD_INDEXING_PRESSURE_STORE_FIELD_NAME)
.get(shardIndexingPressure);
Map<Long, Object> shardIndexingPressureHotStore =
(Map<Long, Object>)
getField(
SHARD_INDEXING_PRESSURE_STORE_CLASS_NAME,
SHARD_INDEXING_PRESSURE_HOT_STORE_FIELD_NAME)
.get(shardIndexingPressureStore);
value.setLength(0);
shardIndexingPressureHotStore.entrySet().stream()
.limit(MAX_HOT_STORE_LIMIT)
.forEach(
storeObject -> {
try {
JSONObject tracker =
(JSONObject)
parser.parse(
mapper.writeValueAsString(
storeObject
.getValue()));
JSONObject shardId =
(JSONObject)
parser.parse(
mapper.writeValueAsString(
tracker.get(
"shardId")));
value.append(
PerformanceAnalyzerMetrics
.getJsonCurrentMilliSeconds())
.append(
PerformanceAnalyzerMetrics
.sMetricNewLineDelimitor);
value.append(
new ShardIndexingPressureStatus(
AllMetrics.IndexingStage
.COORDINATING
.toString(),
shardId.get("indexName")
.toString(),
shardId.get("id")
.toString(),
Long.parseLong(
tracker.get(
"coordinatingRejections")
.toString()),
Long.parseLong(
tracker.get(
"currentCoordinatingBytes")
.toString()),
Long.parseLong(
tracker.get(
"primaryAndCoordinatingLimits")
.toString()),
Double.longBitsToDouble(
Long.parseLong(
tracker.get(
"coordinatingThroughputMovingAverage")
.toString())),
Long.parseLong(
tracker.get(
"lastSuccessfulCoordinatingRequestTimestamp")
.toString()))
.serialize())
.append(
PerformanceAnalyzerMetrics
.sMetricNewLineDelimitor);
value.append(
new ShardIndexingPressureStatus(
AllMetrics.IndexingStage
.PRIMARY
.toString(),
shardId.get("indexName")
.toString(),
shardId.get("id")
.toString(),
Long.parseLong(
tracker.get(
"primaryRejections")
.toString()),
Long.parseLong(
tracker.get(
"currentPrimaryBytes")
.toString()),
Long.parseLong(
tracker.get(
"primaryAndCoordinatingLimits")
.toString()),
Double.longBitsToDouble(
Long.parseLong(
tracker.get(
"primaryThroughputMovingAverage")
.toString())),
Long.parseLong(
tracker.get(
"lastSuccessfulPrimaryRequestTimestamp")
.toString()))
.serialize())
.append(
PerformanceAnalyzerMetrics
.sMetricNewLineDelimitor);
value.append(
new ShardIndexingPressureStatus(
AllMetrics.IndexingStage
.REPLICA
.toString(),
shardId.get("indexName")
.toString(),
shardId.get("id")
.toString(),
Long.parseLong(
tracker.get(
"replicaRejections")
.toString()),
Long.parseLong(
tracker.get(
"currentReplicaBytes")
.toString()),
Long.parseLong(
tracker.get(
"replicaLimits")
.toString()),
Double.longBitsToDouble(
Long.parseLong(
tracker.get(
"replicaThroughputMovingAverage")
.toString())),
Long.parseLong(
tracker.get(
"lastSuccessfulReplicaRequestTimestamp")
.toString()))
.serialize())
.append(
PerformanceAnalyzerMetrics
.sMetricNewLineDelimitor);
} catch (JsonProcessingException | ParseException e) {
LOG.debug(
"Exception raised while parsing string to json object. Skipping IndexingPressureMetricsCollector");
}
});
}
}
if (value.length() != 0) {
saveMetricValues(value.toString(), startTime);
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.SHARD_INDEXING_PRESSURE_COLLECTOR_EXECUTION_TIME,
"",
System.currentTimeMillis() - mCurrT);
}
} catch (Exception ex) {
PerformanceAnalyzerApp.ERRORS_AND_EXCEPTIONS_AGGREGATOR.updateStat(
ExceptionsAndErrors.SHARD_INDEXING_PRESSURE_COLLECTOR_ERROR, "", 1);
LOG.debug(
"Exception in Collecting Shard Indexing Pressure Metrics: {} for startTime {}",
() -> ex.toString(),
() -> startTime);
}
}