in src/main/java/org/opensearch/performanceanalyzer/reader/MetricsEmitter.java [1044:1193]
public static void emitFaultDetectionMetrics(
MetricsDB db, FaultDetectionMetricsSnapshot faultDetectionSnapshot) {
long mCurrT = System.currentTimeMillis();
Dimensions dimensions = new Dimensions();
Result<Record> res = faultDetectionSnapshot.fetchAggregatedTable();
db.createMetric(
new Metric<Double>(
AllMetrics.FaultDetectionMetric.FOLLOWER_CHECK_LATENCY.toString(), 0d),
FAULT_DETECTION_TABLE_DIMENSIONS);
db.createMetric(
new Metric<Double>(
AllMetrics.FaultDetectionMetric.LEADER_CHECK_LATENCY.toString(), 0d),
FAULT_DETECTION_TABLE_DIMENSIONS);
db.createMetric(
new Metric<Double>(
AllMetrics.FaultDetectionMetric.FOLLOWER_CHECK_FAILURE.toString(), 0d),
FAULT_DETECTION_TABLE_DIMENSIONS);
db.createMetric(
new Metric<Double>(
AllMetrics.FaultDetectionMetric.LEADER_CHECK_FAILURE.toString(), 0d),
FAULT_DETECTION_TABLE_DIMENSIONS);
for (Record r : res) {
dimensions.put(
AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString(),
r.get(AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString()).toString());
dimensions.put(
AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString(),
r.get(AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString()).toString());
Double sumLatency =
Double.parseDouble(
r.get(
DBUtils.getAggFieldName(
FaultDetectionMetricsSnapshot.Fields.LAT
.toString(),
MetricsDB.SUM))
.toString());
Double avgLatency =
Double.parseDouble(
r.get(
DBUtils.getAggFieldName(
FaultDetectionMetricsSnapshot.Fields.LAT
.toString(),
MetricsDB.AVG))
.toString());
Double minLatency =
Double.parseDouble(
r.get(
DBUtils.getAggFieldName(
FaultDetectionMetricsSnapshot.Fields.LAT
.toString(),
MetricsDB.MIN))
.toString());
Double maxLatency =
Double.parseDouble(
r.get(
DBUtils.getAggFieldName(
FaultDetectionMetricsSnapshot.Fields.LAT
.toString(),
MetricsDB.MAX))
.toString());
Double sumFault =
Double.parseDouble(
r.get(
DBUtils.getAggFieldName(
FaultDetectionMetricsSnapshot.Fields.FAULT
.toString(),
MetricsDB.SUM))
.toString());
Double avgFault =
Double.parseDouble(
r.get(
DBUtils.getAggFieldName(
FaultDetectionMetricsSnapshot.Fields.FAULT
.toString(),
MetricsDB.AVG))
.toString());
Double minFault =
Double.parseDouble(
r.get(
DBUtils.getAggFieldName(
FaultDetectionMetricsSnapshot.Fields.FAULT
.toString(),
MetricsDB.MIN))
.toString());
Double maxFault =
Double.parseDouble(
r.get(
DBUtils.getAggFieldName(
FaultDetectionMetricsSnapshot.Fields.FAULT
.toString(),
MetricsDB.MAX))
.toString());
if (r.get(FaultDetectionMetricsSnapshot.Fields.FAULT_DETECTION_TYPE.toString())
.toString()
.equals(PerformanceAnalyzerMetrics.FAULT_DETECTION_FOLLOWER_CHECK)) {
db.putMetric(
new Metric<Double>(
AllMetrics.FaultDetectionMetric.FOLLOWER_CHECK_LATENCY.toString(),
sumLatency,
avgLatency,
minLatency,
maxLatency),
dimensions,
0);
db.putMetric(
new Metric<Double>(
AllMetrics.FaultDetectionMetric.FOLLOWER_CHECK_FAILURE.toString(),
sumFault,
avgFault,
minFault,
maxFault),
dimensions,
0);
} else if (r.get(FaultDetectionMetricsSnapshot.Fields.FAULT_DETECTION_TYPE.toString())
.toString()
.equals(PerformanceAnalyzerMetrics.FAULT_DETECTION_LEADER_CHECK)) {
db.putMetric(
new Metric<Double>(
AllMetrics.FaultDetectionMetric.LEADER_CHECK_LATENCY.toString(),
sumLatency,
avgLatency,
minLatency,
maxLatency),
dimensions,
0);
db.putMetric(
new Metric<Double>(
AllMetrics.FaultDetectionMetric.LEADER_CHECK_FAILURE.toString(),
sumFault,
avgFault,
minFault,
maxFault),
dimensions,
0);
}
}
long mFinalT = System.currentTimeMillis();
PerformanceAnalyzerApp.READER_METRICS_AGGREGATOR.updateStat(
ReaderMetrics.FAULT_DETECTION_METRICS_EMITTER_EXECUTION_TIME, "", mFinalT - mCurrT);
LOG.debug(
"Total time taken for writing fault detection metrics to metricsdb: {}",
mFinalT - mCurrT);
}