in src/main/java/org/opensearch/ad/cluster/HashRing.java [221:319]
private void buildCircles(Set<String> removedNodeIds, Set<String> addedNodeIds, ActionListener<Boolean> actionListener) {
if (buildHashRingSemaphore.availablePermits() != 0) {
throw new AnomalyDetectionException("Must get update hash ring semaphore before building AD hash ring");
}
try {
DiscoveryNode localNode = clusterService.localNode();
if (removedNodeIds != null && removedNodeIds.size() > 0) {
LOG.info("Node removed: {}", Arrays.toString(removedNodeIds.toArray(new String[0])));
for (String nodeId : removedNodeIds) {
ADNodeInfo nodeInfo = nodeAdVersions.remove(nodeId);
if (nodeInfo != null && nodeInfo.isEligibleDataNode()) {
removeNodeFromCircles(nodeId, nodeInfo.getAdVersion());
LOG.info("Remove data node from AD version hash ring: {}", nodeId);
}
}
}
Set<String> allAddedNodes = new HashSet<>();
if (addedNodeIds != null) {
allAddedNodes.addAll(addedNodeIds);
}
if (!nodeAdVersions.containsKey(localNode.getId())) {
allAddedNodes.add(localNode.getId());
}
if (allAddedNodes.size() == 0) {
actionListener.onResponse(true);
// rebuild AD version hash ring with cooldown.
rebuildCirclesForRealtimeAD();
buildHashRingSemaphore.release();
return;
}
LOG.info("Node added: {}", Arrays.toString(allAddedNodes.toArray(new String[0])));
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.nodesIds(allAddedNodes.toArray(new String[0]));
nodesInfoRequest.clear().addMetric(NodesInfoRequest.Metric.PLUGINS.metricName());
AdminClient admin = client.admin();
ClusterAdminClient cluster = admin.cluster();
cluster.nodesInfo(nodesInfoRequest, ActionListener.wrap(r -> {
Map<String, NodeInfo> nodesMap = r.getNodesMap();
if (nodesMap != null && nodesMap.size() > 0) {
for (Map.Entry<String, NodeInfo> entry : nodesMap.entrySet()) {
NodeInfo nodeInfo = entry.getValue();
PluginsAndModules plugins = nodeInfo.getInfo(PluginsAndModules.class);
DiscoveryNode curNode = nodeInfo.getNode();
if (plugins == null) {
continue;
}
TreeMap<Integer, DiscoveryNode> circle = null;
for (PluginInfo pluginInfo : plugins.getPluginInfos()) {
if (AD_PLUGIN_NAME.equals(pluginInfo.getName()) || AD_PLUGIN_NAME_FOR_TEST.equals(pluginInfo.getName())) {
Version version = ADVersionUtil.fromString(pluginInfo.getVersion());
boolean eligibleNode = nodeFilter.isEligibleNode(curNode);
if (eligibleNode) {
circle = circles.computeIfAbsent(version, key -> new TreeMap<>());
LOG.info("Add data node to AD version hash ring: {}", curNode.getId());
}
nodeAdVersions.put(curNode.getId(), new ADNodeInfo(version, eligibleNode));
break;
}
}
if (circle != null) {
for (int i = 0; i < VIRTUAL_NODE_COUNT; i++) {
circle.put(Murmur3HashFunction.hash(curNode.getId() + i), curNode);
}
}
}
}
LOG.info("All nodes with known AD version: {}", nodeAdVersions);
// rebuild AD version hash ring with cooldown after all new node added.
rebuildCirclesForRealtimeAD();
if (!dataMigrator.isMigrated() && circles.size() > 0 && circles.lastEntry().getKey().onOrAfter(Version.V_1_1_0)) {
// Find owning node with highest AD version to make sure the data migration logic be compatible to
// latest AD version when upgrade.
Optional<DiscoveryNode> owningNode = getOwningNodeWithHighestAdVersion(DEFAULT_HASH_RING_MODEL_ID);
String localNodeId = localNode.getId();
if (owningNode.isPresent() && localNodeId.equals(owningNode.get().getId())) {
dataMigrator.migrateData();
} else {
dataMigrator.skipMigration();
}
}
buildHashRingSemaphore.release();
hashRingInited.set(true);
actionListener.onResponse(true);
}, e -> {
buildHashRingSemaphore.release();
actionListener.onFailure(e);
LOG.error("Fail to get node info to build AD version hash ring", e);
}));
} catch (Exception e) {
LOG.error("Failed to build AD version circles", e);
buildHashRingSemaphore.release();
actionListener.onFailure(e);
}
}