in controller/cluster.go [196:273]
func (c *ClusterChecker) parallelProbeNodes(ctx context.Context, cluster *store.Cluster) {
var mu sync.Mutex
var latestNodeVersion int64 = 0
var latestClusterNodesStr string
var wg sync.WaitGroup
for i, shard := range cluster.Shards {
for _, node := range shard.Nodes {
wg.Add(1)
go func(shardIdx int, n store.Node) {
defer wg.Done()
log := logger.Get().With(
zap.String("id", n.ID()),
zap.Bool("is_master", n.IsMaster()),
zap.String("addr", n.Addr()),
)
version, err := c.probeNode(ctx, n)
// Don't sync the cluster info to the node if it is restoring the db from backup
if errors.Is(err, ErrRestoringBackUp) {
log.Error("The node is restoring the db from backup")
return
}
if err != nil && !errors.Is(err, ErrClusterNotInitialized) {
failureCount := c.increaseFailureCount(shardIdx, n)
log.With(zap.Error(err),
zap.Int64("failure_count", failureCount),
).Warn("Failed to probe the node")
return
}
log.Debug("Probe the clusterName node")
clusterVersion := cluster.Version.Load()
if version < clusterVersion {
// sync the clusterName to the latest version
if err := n.SyncClusterInfo(ctx, cluster); err != nil {
log.With(zap.Error(err)).Error("Failed to sync the clusterName info")
}
} else if version > clusterVersion {
log.With(
zap.Int64("node.version", version),
zap.Int64("clusterName.version", clusterVersion),
).Warn("The node is in a higher version")
mu.Lock()
if version > latestNodeVersion {
latestNodeVersion = version
clusterNodesStr, errX := n.GetClusterNodesString(ctx)
if errX != nil {
log.With(zap.String("node", n.ID()), zap.Error(errX)).Error("Failed to get the cluster nodes info from node")
// set empty explicitly
latestClusterNodesStr = ""
} else {
latestClusterNodesStr = clusterNodesStr
}
}
mu.Unlock()
}
c.resetFailureCount(n.ID())
}(i, node)
}
}
wg.Wait()
if latestNodeVersion > cluster.Version.Load() && latestClusterNodesStr != "" {
latestClusterInfo, err := store.ParseCluster(latestClusterNodesStr)
if err != nil {
logger.Get().With(zap.String("cluster", latestClusterNodesStr), zap.Error(err)).Error("Failed to parse the cluster info")
return
}
latestClusterInfo.Name = cluster.Name
latestClusterInfo.SetPassword(cluster.Shards[0].Nodes[0].Password())
err = c.clusterStore.UpdateCluster(ctx, c.namespace, latestClusterInfo)
if err != nil {
logger.Get().With(zap.String("cluster", latestClusterNodesStr), zap.Error(err)).Error("Failed to update the cluster info")
return
}
logger.Get().With(zap.Any("latestClusterInfo", latestClusterInfo)).Info("Refresh latest cluster info to all nodes")
}
}