in src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/hot_node/GenericResourceRca.java [125:190]
public ResourceFlowUnit<HotResourceSummary> operate() {
counter += 1;
for (MetricFlowUnit flowunit : resourceUsageGroupByConsumer.getFlowUnits()) {
if (flowunit.isEmpty()) {
continue;
}
final Result<Record> result = flowunit.getData();
if (result == null) {
continue;
}
boolean recordParsingError = false;
double totalUsage = 0.0;
for (Record record : result) {
int fieldSize = record.size();
if (fieldSize < 2) {
LOG.error(
"Field size {} is less than 2, the SQL record has wrong data format",
fieldSize);
recordParsingError = true;
break;
}
try {
double num = record.getValue(fieldSize - 1, Double.class);
totalUsage += num;
} catch (DataTypeException de) {
LOG.error(
"Fail to data field from SQL record, field index : {}, trace : {}",
fieldSize - 1,
de.getStackTrace());
recordParsingError = true;
break;
}
}
if (!recordParsingError) {
slidingWindow.next(new SlidingWindowData(this.clock.millis(), totalUsage));
}
}
if (counter == rcaPeriod) {
ResourceContext context = null;
HotResourceSummary summary = null;
// reset the variables
counter = 0;
double avgCpuUsage = slidingWindow.readAvg();
if (!Double.isNaN(avgCpuUsage) && avgCpuUsage > threshold) {
context = new ResourceContext(Resources.State.CONTENDED);
} else {
context = new ResourceContext(Resources.State.HEALTHY);
}
// check to see if the value is above lower bound thres
if (!Double.isNaN(avgCpuUsage) && avgCpuUsage >= lowerBoundThreshold) {
summary =
new HotResourceSummary(
this.resource, threshold, avgCpuUsage, SLIDING_WINDOW_IN_MIN * 60);
addTopConsumerSummary(summary);
}
return new ResourceFlowUnit<>(clock.millis(), context, summary);
} else {
// we return an empty FlowUnit RCA for now. Can change to healthy (or previous known RCA
// state)
return new ResourceFlowUnit<>(clock.millis());
}
}