private ResourceFlowUnit checkUnbalancedNode()

in src/main/java/org/opensearch/performanceanalyzer/rca/store/rca/HotNodeClusterRca.java [121:209]


    private ResourceFlowUnit<HotClusterSummary> checkUnbalancedNode() {
        // NodeID -> HotNodeSummary, store the HotNodeSummary that is generated for each node
        Map<String, HotNodeSummary> nodeSummaryMap = new HashMap<>();

        long currTimestamp = clock.millis();
        // For each resource type, scan over all the nodes in cluster and calculate its medium.
        final List<Resource> resourceTypeColumnKeys =
                ImmutableList.copyOf(nodeTable.columnKeySet());
        for (Resource resourceType : resourceTypeColumnKeys) {
            List<NodeResourceUsage> resourceUsages = new ArrayList<>();
            for (InstanceDetails nodeDetail : dataNodesDetails) {
                NodeResourceUsage currentUsage =
                        nodeTable.get(nodeDetail.getInstanceId().toString(), resourceType);
                // some node does not has this resource type in table.
                if (currentUsage == null) {
                    continue;
                }
                // drop the value if the timestamp expires
                if (currTimestamp - currentUsage.timestamp
                        > TimeUnit.MINUTES.toMillis(TIMESTAMP_EXPIRATION_IN_MINS)) {
                    nodeTable.row(nodeDetail.getInstanceId().toString()).remove(resourceType);
                    continue;
                }
                resourceUsages.add(currentUsage);
            }

            // skip this resource type if we have not yet collected enough summaries from data nodes
            int nodeCntThreshold = (int) ((double) dataNodesDetails.size() * NODE_COUNT_THRESHOLD);
            // we need at least 2 nodes
            if (nodeCntThreshold < 2) {
                nodeCntThreshold = 2;
            }
            if (resourceUsages.size() < nodeCntThreshold) {
                continue;
            }

            // sort and get the medium value
            resourceUsages.sort(
                    Comparator.comparingDouble(
                            (NodeResourceUsage r) -> r.resourceSummary.getValue()));
            int mediumIdx = resourceUsages.size() / 2;
            if (resourceUsages.size() % 2 == 0) {
                mediumIdx -= 1;
            }
            double medium = resourceUsages.get(mediumIdx).resourceSummary.getValue();

            // iterate the nodeid list again and check if some node is unbalanced
            for (InstanceDetails nodeDetail : dataNodesDetails) {
                NodeResourceUsage currentUsage =
                        nodeTable.get(nodeDetail.getInstanceId().toString(), resourceType);
                if (currentUsage == null) {
                    continue;
                }
                // if the resource value is a outlier.
                // and we also want to make sure the value we get here is large enough.
                // we might want to filter out noise data if the value < 10% of the threshold of
                // that resource type
                if (currentUsage.resourceSummary.getValue()
                                >= medium * (1 + unbalancedResourceThreshold)
                        && currentUsage.resourceSummary.getValue()
                                >= currentUsage.resourceSummary.getThreshold()
                                        * resourceUsageLowerBoundThreshold) {
                    if (!nodeSummaryMap.containsKey(nodeDetail.getInstanceId().toString())) {
                        nodeSummaryMap.put(
                                nodeDetail.getInstanceId().toString(),
                                new HotNodeSummary(
                                        nodeDetail.getInstanceId(), nodeDetail.getInstanceIp()));
                    }
                    nodeSummaryMap
                            .get(nodeDetail.getInstanceId().toString())
                            .appendNestedSummary(currentUsage.resourceSummary);
                }
            }
        }

        HotClusterSummary summary = null;
        ResourceContext context = null;
        // create summary for unbalanced nodes
        if (nodeSummaryMap.isEmpty()) {
            context = new ResourceContext(Resources.State.HEALTHY);
        } else {
            context = new ResourceContext(Resources.State.UNHEALTHY);
            summary = new HotClusterSummary(dataNodesDetails.size(), nodeSummaryMap.size());
            for (Map.Entry<String, HotNodeSummary> entry : nodeSummaryMap.entrySet()) {
                summary.appendNestedSummary(entry.getValue());
            }
        }
        return new ResourceFlowUnit<>(System.currentTimeMillis(), context, summary, true);
    }