in src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/HotNodeClusterRca.java [121:209]
private ResourceFlowUnit<HotClusterSummary> checkUnbalancedNode() {
// NodeID -> HotNodeSummary, store the HotNodeSummary that is generated for each node
Map<String, HotNodeSummary> nodeSummaryMap = new HashMap<>();
long currTimestamp = clock.millis();
// For each resource type, scan over all the nodes in cluster and calculate its medium.
final List<Resource> resourceTypeColumnKeys =
ImmutableList.copyOf(nodeTable.columnKeySet());
for (Resource resourceType : resourceTypeColumnKeys) {
List<NodeResourceUsage> resourceUsages = new ArrayList<>();
for (InstanceDetails nodeDetail : dataNodesDetails) {
NodeResourceUsage currentUsage =
nodeTable.get(nodeDetail.getInstanceId().toString(), resourceType);
// some node does not has this resource type in table.
if (currentUsage == null) {
continue;
}
// drop the value if the timestamp expires
if (currTimestamp - currentUsage.timestamp
> TimeUnit.MINUTES.toMillis(TIMESTAMP_EXPIRATION_IN_MINS)) {
nodeTable.row(nodeDetail.getInstanceId().toString()).remove(resourceType);
continue;
}
resourceUsages.add(currentUsage);
}
// skip this resource type if we have not yet collected enough summaries from data nodes
int nodeCntThreshold = (int) ((double) dataNodesDetails.size() * NODE_COUNT_THRESHOLD);
// we need at least 2 nodes
if (nodeCntThreshold < 2) {
nodeCntThreshold = 2;
}
if (resourceUsages.size() < nodeCntThreshold) {
continue;
}
// sort and get the medium value
resourceUsages.sort(
Comparator.comparingDouble(
(NodeResourceUsage r) -> r.resourceSummary.getValue()));
int mediumIdx = resourceUsages.size() / 2;
if (resourceUsages.size() % 2 == 0) {
mediumIdx -= 1;
}
double medium = resourceUsages.get(mediumIdx).resourceSummary.getValue();
// iterate the nodeid list again and check if some node is unbalanced
for (InstanceDetails nodeDetail : dataNodesDetails) {
NodeResourceUsage currentUsage =
nodeTable.get(nodeDetail.getInstanceId().toString(), resourceType);
if (currentUsage == null) {
continue;
}
// if the resource value is a outlier.
// and we also want to make sure the value we get here is large enough.
// we might want to filter out noise data if the value < 10% of the threshold of
// that resource type
if (currentUsage.resourceSummary.getValue()
>= medium * (1 + unbalancedResourceThreshold)
&& currentUsage.resourceSummary.getValue()
>= currentUsage.resourceSummary.getThreshold()
* resourceUsageLowerBoundThreshold) {
if (!nodeSummaryMap.containsKey(nodeDetail.getInstanceId().toString())) {
nodeSummaryMap.put(
nodeDetail.getInstanceId().toString(),
new HotNodeSummary(
nodeDetail.getInstanceId(), nodeDetail.getInstanceIp()));
}
nodeSummaryMap
.get(nodeDetail.getInstanceId().toString())
.appendNestedSummary(currentUsage.resourceSummary);
}
}
}
HotClusterSummary summary = null;
ResourceContext context = null;
// create summary for unbalanced nodes
if (nodeSummaryMap.isEmpty()) {
context = new ResourceContext(Resources.State.HEALTHY);
} else {
context = new ResourceContext(Resources.State.UNHEALTHY);
summary = new HotClusterSummary(dataNodesDetails.size(), nodeSummaryMap.size());
for (Map.Entry<String, HotNodeSummary> entry : nodeSummaryMap.entrySet()) {
summary.appendNestedSummary(entry.getValue());
}
}
return new ResourceFlowUnit<>(System.currentTimeMillis(), context, summary, true);
}