func()

in controller/cluster.go [196:273]


func (c *ClusterChecker) parallelProbeNodes(ctx context.Context, cluster *store.Cluster) {
	var mu sync.Mutex
	var latestNodeVersion int64 = 0
	var latestClusterNodesStr string
	var wg sync.WaitGroup

	for i, shard := range cluster.Shards {
		for _, node := range shard.Nodes {
			wg.Add(1)
			go func(shardIdx int, n store.Node) {
				defer wg.Done()
				log := logger.Get().With(
					zap.String("id", n.ID()),
					zap.Bool("is_master", n.IsMaster()),
					zap.String("addr", n.Addr()),
				)
				version, err := c.probeNode(ctx, n)
				// Don't sync the cluster info to the node if it is restoring the db from backup
				if errors.Is(err, ErrRestoringBackUp) {
					log.Error("The node is restoring the db from backup")
					return
				}
				if err != nil && !errors.Is(err, ErrClusterNotInitialized) {
					failureCount := c.increaseFailureCount(shardIdx, n)
					log.With(zap.Error(err),
						zap.Int64("failure_count", failureCount),
					).Warn("Failed to probe the node")
					return
				}
				log.Debug("Probe the clusterName node")

				clusterVersion := cluster.Version.Load()
				if version < clusterVersion {
					// sync the clusterName to the latest version
					if err := n.SyncClusterInfo(ctx, cluster); err != nil {
						log.With(zap.Error(err)).Error("Failed to sync the clusterName info")
					}
				} else if version > clusterVersion {
					log.With(
						zap.Int64("node.version", version),
						zap.Int64("clusterName.version", clusterVersion),
					).Warn("The node is in a higher version")
					mu.Lock()
					if version > latestNodeVersion {
						latestNodeVersion = version
						clusterNodesStr, errX := n.GetClusterNodesString(ctx)
						if errX != nil {
							log.With(zap.String("node", n.ID()), zap.Error(errX)).Error("Failed to get the cluster nodes info from node")
							// set empty explicitly
							latestClusterNodesStr = ""
						} else {
							latestClusterNodesStr = clusterNodesStr
						}
					}
					mu.Unlock()
				}
				c.resetFailureCount(n.ID())
			}(i, node)
		}
	}

	wg.Wait()
	if latestNodeVersion > cluster.Version.Load() && latestClusterNodesStr != "" {
		latestClusterInfo, err := store.ParseCluster(latestClusterNodesStr)
		if err != nil {
			logger.Get().With(zap.String("cluster", latestClusterNodesStr), zap.Error(err)).Error("Failed to parse the cluster info")
			return
		}
		latestClusterInfo.Name = cluster.Name
		latestClusterInfo.SetPassword(cluster.Shards[0].Nodes[0].Password())
		err = c.clusterStore.UpdateCluster(ctx, c.namespace, latestClusterInfo)
		if err != nil {
			logger.Get().With(zap.String("cluster", latestClusterNodesStr), zap.Error(err)).Error("Failed to update the cluster info")
			return
		}
		logger.Get().With(zap.Any("latestClusterInfo", latestClusterInfo)).Info("Refresh latest cluster info to all nodes")
	}
}