private void buildCircles()

in src/main/java/org/opensearch/ad/cluster/HashRing.java [221:319]


    private void buildCircles(Set<String> removedNodeIds, Set<String> addedNodeIds, ActionListener<Boolean> actionListener) {
        if (buildHashRingSemaphore.availablePermits() != 0) {
            throw new AnomalyDetectionException("Must get update hash ring semaphore before building AD hash ring");
        }
        try {
            DiscoveryNode localNode = clusterService.localNode();
            if (removedNodeIds != null && removedNodeIds.size() > 0) {
                LOG.info("Node removed: {}", Arrays.toString(removedNodeIds.toArray(new String[0])));
                for (String nodeId : removedNodeIds) {
                    ADNodeInfo nodeInfo = nodeAdVersions.remove(nodeId);
                    if (nodeInfo != null && nodeInfo.isEligibleDataNode()) {
                        removeNodeFromCircles(nodeId, nodeInfo.getAdVersion());
                        LOG.info("Remove data node from AD version hash ring: {}", nodeId);
                    }
                }
            }
            Set<String> allAddedNodes = new HashSet<>();

            if (addedNodeIds != null) {
                allAddedNodes.addAll(addedNodeIds);
            }
            if (!nodeAdVersions.containsKey(localNode.getId())) {
                allAddedNodes.add(localNode.getId());
            }
            if (allAddedNodes.size() == 0) {
                actionListener.onResponse(true);
                // rebuild AD version hash ring with cooldown.
                rebuildCirclesForRealtimeAD();
                buildHashRingSemaphore.release();
                return;
            }

            LOG.info("Node added: {}", Arrays.toString(allAddedNodes.toArray(new String[0])));
            NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
            nodesInfoRequest.nodesIds(allAddedNodes.toArray(new String[0]));
            nodesInfoRequest.clear().addMetric(NodesInfoRequest.Metric.PLUGINS.metricName());

            AdminClient admin = client.admin();
            ClusterAdminClient cluster = admin.cluster();
            cluster.nodesInfo(nodesInfoRequest, ActionListener.wrap(r -> {
                Map<String, NodeInfo> nodesMap = r.getNodesMap();
                if (nodesMap != null && nodesMap.size() > 0) {
                    for (Map.Entry<String, NodeInfo> entry : nodesMap.entrySet()) {
                        NodeInfo nodeInfo = entry.getValue();
                        PluginsAndModules plugins = nodeInfo.getInfo(PluginsAndModules.class);
                        DiscoveryNode curNode = nodeInfo.getNode();
                        if (plugins == null) {
                            continue;
                        }
                        TreeMap<Integer, DiscoveryNode> circle = null;
                        for (PluginInfo pluginInfo : plugins.getPluginInfos()) {
                            if (AD_PLUGIN_NAME.equals(pluginInfo.getName()) || AD_PLUGIN_NAME_FOR_TEST.equals(pluginInfo.getName())) {
                                Version version = ADVersionUtil.fromString(pluginInfo.getVersion());
                                boolean eligibleNode = nodeFilter.isEligibleNode(curNode);
                                if (eligibleNode) {
                                    circle = circles.computeIfAbsent(version, key -> new TreeMap<>());
                                    LOG.info("Add data node to AD version hash ring: {}", curNode.getId());
                                }
                                nodeAdVersions.put(curNode.getId(), new ADNodeInfo(version, eligibleNode));
                                break;
                            }
                        }
                        if (circle != null) {
                            for (int i = 0; i < VIRTUAL_NODE_COUNT; i++) {
                                circle.put(Murmur3HashFunction.hash(curNode.getId() + i), curNode);
                            }
                        }
                    }
                }
                LOG.info("All nodes with known AD version: {}", nodeAdVersions);

                // rebuild AD version hash ring with cooldown after all new node added.
                rebuildCirclesForRealtimeAD();

                if (!dataMigrator.isMigrated() && circles.size() > 0 && circles.lastEntry().getKey().onOrAfter(Version.V_1_1_0)) {
                    // Find owning node with highest AD version to make sure the data migration logic be compatible to
                    // latest AD version when upgrade.
                    Optional<DiscoveryNode> owningNode = getOwningNodeWithHighestAdVersion(DEFAULT_HASH_RING_MODEL_ID);
                    String localNodeId = localNode.getId();
                    if (owningNode.isPresent() && localNodeId.equals(owningNode.get().getId())) {
                        dataMigrator.migrateData();
                    } else {
                        dataMigrator.skipMigration();
                    }
                }
                buildHashRingSemaphore.release();
                hashRingInited.set(true);
                actionListener.onResponse(true);
            }, e -> {
                buildHashRingSemaphore.release();
                actionListener.onFailure(e);
                LOG.error("Fail to get node info to build AD version hash ring", e);
            }));
        } catch (Exception e) {
            LOG.error("Failed to build AD version circles", e);
            buildHashRingSemaphore.release();
            actionListener.onFailure(e);
        }
    }