in src/main/java/org/opensearch/performanceanalyzer/collectors/FaultDetectionMetricsCollector.java [75:156]
public void collectMetrics(long startTime) {
if (!controller.isCollectorEnabled(configOverridesWrapper, getCollectorName())) {
return;
}
long mCurrT = System.currentTimeMillis();
Class<?> faultDetectionHandler = null;
try {
faultDetectionHandler = Class.forName(FAULT_DETECTION_HANDLER_NAME);
} catch (ClassNotFoundException e) {
LOG.debug(
"No Handler Detected for Fault Detection. Skipping FaultDetectionMetricsCollector");
return;
}
try {
BlockingQueue<String> metricQueue =
(BlockingQueue<String>)
getFaultDetectionHandlerMetricsQueue(faultDetectionHandler).get(null);
List<String> metrics = new ArrayList<>();
metricQueue.drainTo(metrics);
List<ClusterFaultDetectionContext> faultDetectionContextsList = new ArrayList<>();
for (String metric : metrics) {
faultDetectionContextsList.add(
mapper.readValue(metric, ClusterFaultDetectionContext.class));
}
for (ClusterFaultDetectionContext clusterFaultDetectionContext :
faultDetectionContextsList) {
value.setLength(0);
value.append(PerformanceAnalyzerMetrics.getCurrentTimeMetric());
addMetricEntry(
value,
AllMetrics.FaultDetectionDimension.SOURCE_NODE_ID.toString(),
clusterFaultDetectionContext.getSourceNodeId());
addMetricEntry(
value,
AllMetrics.FaultDetectionDimension.TARGET_NODE_ID.toString(),
clusterFaultDetectionContext.getTargetNodeId());
if (StringUtils.isEmpty(clusterFaultDetectionContext.getStartTime())) {
addMetricEntry(
value,
AllMetrics.CommonMetric.FINISH_TIME.toString(),
clusterFaultDetectionContext.getFinishTime());
addMetricEntry(
value,
PerformanceAnalyzerMetrics.FAULT,
clusterFaultDetectionContext.getFault());
saveMetricValues(
value.toString(),
startTime,
clusterFaultDetectionContext.getType(),
clusterFaultDetectionContext.getRequestId(),
PerformanceAnalyzerMetrics.FINISH_FILE_NAME);
} else {
addMetricEntry(
value,
AllMetrics.CommonMetric.START_TIME.toString(),
clusterFaultDetectionContext.getStartTime());
saveMetricValues(
value.toString(),
startTime,
clusterFaultDetectionContext.getType(),
clusterFaultDetectionContext.getRequestId(),
PerformanceAnalyzerMetrics.START_FILE_NAME);
}
}
PerformanceAnalyzerApp.WRITER_METRICS_AGGREGATOR.updateStat(
WriterMetrics.FAULT_DETECTION_COLLECTOR_EXECUTION_TIME,
"",
System.currentTimeMillis() - mCurrT);
} catch (Exception ex) {
PerformanceAnalyzerApp.ERRORS_AND_EXCEPTIONS_AGGREGATOR.updateStat(
ExceptionsAndErrors.FAULT_DETECTION_COLLECTOR_ERROR,
"",
System.currentTimeMillis() - mCurrT);
LOG.debug(
"Exception in Collecting FaultDetection Metrics: {} for startTime {}",
() -> ex.toString(),
() -> startTime);
}
}