controller/cluster.go (343 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package controller import ( "context" "errors" "strings" "sync" "time" "go.uber.org/zap" "github.com/apache/kvrocks-controller/logger" "github.com/apache/kvrocks-controller/store" ) var ( ErrClusterNotInitialized = errors.New("CLUSTERDOWN The cluster is not initialized") ErrRestoringBackUp = errors.New("LOADING kvrocks is restoring the db from backup") ) type ClusterCheckOptions struct { pingInterval time.Duration maxFailureCount int64 } type ClusterChecker struct { options ClusterCheckOptions clusterStore store.Store clusterMu sync.Mutex cluster *store.Cluster namespace string clusterName string failureMu sync.Mutex failureCounts map[string]int64 syncCh chan struct{} ctx context.Context cancelFn context.CancelFunc wg sync.WaitGroup } func NewClusterChecker(s store.Store, ns, cluster string) *ClusterChecker { ctx, cancel := context.WithCancel(context.Background()) c := &ClusterChecker{ namespace: ns, clusterName: cluster, clusterStore: s, options: ClusterCheckOptions{ pingInterval: time.Second * 3, maxFailureCount: 5, }, failureCounts: make(map[string]int64), syncCh: make(chan struct{}, 1), ctx: ctx, cancelFn: cancel, } return c } func (c *ClusterChecker) Start() { c.wg.Add(1) go c.probeLoop() c.wg.Add(1) go c.migrationLoop() } func (c *ClusterChecker) WithPingInterval(interval time.Duration) *ClusterChecker { c.options.pingInterval = interval if c.options.pingInterval < 200*time.Millisecond { c.options.pingInterval = 200 * time.Millisecond } return c } func (c *ClusterChecker) WithMaxFailureCount(count int64) *ClusterChecker { c.options.maxFailureCount = count if c.options.maxFailureCount < 1 { c.options.maxFailureCount = 5 } return c } func (c *ClusterChecker) probeNode(ctx context.Context, node store.Node) (int64, error) { clusterInfo, err := node.GetClusterInfo(ctx) if err != nil { // We need to use the string contains to check the error message // since Kvrocks wrongly returns the error message with `ERR` prefix. // And it's fixed in PR: https://github.com/apache/kvrocks/pull/2362, // but we need to be compatible with the old version here. if strings.Contains(err.Error(), ErrRestoringBackUp.Error()) { return -1, ErrRestoringBackUp } else if strings.Contains(err.Error(), ErrClusterNotInitialized.Error()) { return -1, ErrClusterNotInitialized } else { return -1, err } } return clusterInfo.CurrentEpoch, nil } func (c *ClusterChecker) increaseFailureCount(shardIndex int, node store.Node) int64 { id := node.ID() c.failureMu.Lock() if _, ok := c.failureCounts[id]; !ok { c.failureCounts[id] = 0 } c.failureCounts[id] += 1 count := c.failureCounts[id] c.failureMu.Unlock() // don't add the node into the failover candidates if it's not a master node if !node.IsMaster() { return count } log := logger.Get().With( zap.String("id", node.ID()), zap.Bool("is_master", node.IsMaster()), zap.String("addr", node.Addr())) if count%c.options.maxFailureCount == 0 { cluster, err := c.clusterStore.GetCluster(c.ctx, c.namespace, c.clusterName) if err != nil { log.Error("Failed to get the clusterName info", zap.Error(err)) return count } newMasterID, err := cluster.PromoteNewMaster(c.ctx, shardIndex, node.ID(), "") if err == nil { // the node is normal if it can be elected as the new master, // because it requires the node is healthy. c.resetFailureCount(newMasterID) err = c.clusterStore.UpdateCluster(c.ctx, c.namespace, cluster) } if err != nil { log.Error("Failed to promote the new master", zap.Error(err)) } else { log.With(zap.String("new_master_id", newMasterID)).Info("Promote the new master") } } return count } func (c *ClusterChecker) resetFailureCount(nodeID string) { c.failureMu.Lock() delete(c.failureCounts, nodeID) c.failureMu.Unlock() } func (c *ClusterChecker) sendSyncEvent() { select { case c.syncCh <- struct{}{}: case <-c.ctx.Done(): return } } func (c *ClusterChecker) syncClusterToNodes(ctx context.Context) error { clusterInfo, err := c.clusterStore.GetCluster(ctx, c.namespace, c.clusterName) if err != nil { return err } for _, shard := range clusterInfo.Shards { for _, node := range shard.Nodes { // sync the clusterName to the latest version if err := node.SyncClusterInfo(ctx, clusterInfo); err != nil { return err } } } return nil } func (c *ClusterChecker) parallelProbeNodes(ctx context.Context, cluster *store.Cluster) { var mu sync.Mutex var latestNodeVersion int64 = 0 var latestClusterNodesStr string var wg sync.WaitGroup for i, shard := range cluster.Shards { for _, node := range shard.Nodes { wg.Add(1) go func(shardIdx int, n store.Node) { defer wg.Done() log := logger.Get().With( zap.String("id", n.ID()), zap.Bool("is_master", n.IsMaster()), zap.String("addr", n.Addr()), ) version, err := c.probeNode(ctx, n) // Don't sync the cluster info to the node if it is restoring the db from backup if errors.Is(err, ErrRestoringBackUp) { log.Error("The node is restoring the db from backup") return } if err != nil && !errors.Is(err, ErrClusterNotInitialized) { failureCount := c.increaseFailureCount(shardIdx, n) log.With(zap.Error(err), zap.Int64("failure_count", failureCount), ).Warn("Failed to probe the node") return } log.Debug("Probe the clusterName node") clusterVersion := cluster.Version.Load() if version < clusterVersion { // sync the clusterName to the latest version if err := n.SyncClusterInfo(ctx, cluster); err != nil { log.With(zap.Error(err)).Error("Failed to sync the clusterName info") } } else if version > clusterVersion { log.With( zap.Int64("node.version", version), zap.Int64("clusterName.version", clusterVersion), ).Warn("The node is in a higher version") mu.Lock() if version > latestNodeVersion { latestNodeVersion = version clusterNodesStr, errX := n.GetClusterNodesString(ctx) if errX != nil { log.With(zap.String("node", n.ID()), zap.Error(errX)).Error("Failed to get the cluster nodes info from node") // set empty explicitly latestClusterNodesStr = "" } else { latestClusterNodesStr = clusterNodesStr } } mu.Unlock() } c.resetFailureCount(n.ID()) }(i, node) } } wg.Wait() if latestNodeVersion > cluster.Version.Load() && latestClusterNodesStr != "" { latestClusterInfo, err := store.ParseCluster(latestClusterNodesStr) if err != nil { logger.Get().With(zap.String("cluster", latestClusterNodesStr), zap.Error(err)).Error("Failed to parse the cluster info") return } latestClusterInfo.Name = cluster.Name latestClusterInfo.SetPassword(cluster.Shards[0].Nodes[0].Password()) err = c.clusterStore.UpdateCluster(ctx, c.namespace, latestClusterInfo) if err != nil { logger.Get().With(zap.String("cluster", latestClusterNodesStr), zap.Error(err)).Error("Failed to update the cluster info") return } logger.Get().With(zap.Any("latestClusterInfo", latestClusterInfo)).Info("Refresh latest cluster info to all nodes") } } func (c *ClusterChecker) probeLoop() { defer c.wg.Done() log := logger.Get().With( zap.String("namespace", c.namespace), zap.String("clusterName", c.clusterName), ) probeTicker := time.NewTicker(c.options.pingInterval) defer probeTicker.Stop() for { select { case <-probeTicker.C: clusterInfo, err := c.clusterStore.GetCluster(c.ctx, c.namespace, c.clusterName) if err != nil { log.Error("Failed to get the clusterName info from the clusterStore", zap.Error(err)) break } c.clusterMu.Lock() c.cluster = clusterInfo c.clusterMu.Unlock() c.parallelProbeNodes(c.ctx, clusterInfo) case <-c.syncCh: if err := c.syncClusterToNodes(c.ctx); err != nil { log.Error("Failed to sync the clusterName to the nodes", zap.Error(err)) } case <-c.ctx.Done(): return } } } func (c *ClusterChecker) updateCluster(cluster *store.Cluster) { c.clusterMu.Lock() c.cluster = cluster c.clusterMu.Unlock() } func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedCluster *store.Cluster) { log := logger.Get().With( zap.String("namespace", c.namespace), zap.String("cluster", c.clusterName)) for i, shard := range clonedCluster.Shards { if !shard.IsMigrating() { continue } sourceNodeClusterInfo, err := shard.GetMasterNode().GetClusterInfo(ctx) if err != nil { log.Error("Failed to get the cluster info from the source node", zap.Error(err)) return } if sourceNodeClusterInfo.MigratingSlot == nil { log.Error("The source migration slot is empty", zap.String("migrating_slot", shard.MigratingSlot.String()), ) return } if !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot) { log.Error("Mismatch migrating slot", zap.String("source_migrating_slot", sourceNodeClusterInfo.MigratingSlot.String()), zap.String("migrating_slot", shard.MigratingSlot.String()), ) return } if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= len(clonedCluster.Shards) { log.Error("Invalid target shard index", zap.Int("index", shard.TargetShardIndex)) return } switch sourceNodeClusterInfo.MigratingState { case "none", "start": continue case "fail": migratingSlot := shard.MigratingSlot clonedCluster.Shards[i].ClearMigrateState() if err := c.clusterStore.SetCluster(ctx, c.namespace, clonedCluster); err != nil { log.Error("Failed to update the cluster", zap.Error(err)) return } c.updateCluster(clonedCluster) log.Warn("Failed to migrate the slot", zap.String("slot", migratingSlot.String())) case "success": clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, *shard.MigratingSlot) clonedCluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges( clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, *shard.MigratingSlot, ) migratedSlot := shard.MigratingSlot clonedCluster.Shards[i].ClearMigrateState() if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil { log.Error("Failed to update the cluster", zap.Error(err)) } else { log.Info("Migrate the slot successfully", zap.String("slot", migratedSlot.String())) } c.updateCluster(clonedCluster) default: clonedCluster.Shards[i].ClearMigrateState() if err := c.clusterStore.SetCluster(ctx, c.namespace, clonedCluster); err != nil { log.Error("Failed to update the cluster", zap.Error(err)) return } c.updateCluster(clonedCluster) log.Error("Unknown migrating state", zap.String("state", sourceNodeClusterInfo.MigratingState)) } } } func (c *ClusterChecker) migrationLoop() { defer c.wg.Done() ticker := time.NewTicker(time.Second) defer ticker.Stop() for { select { case <-c.ctx.Done(): return case <-ticker.C: c.clusterMu.Lock() if c.cluster == nil { c.clusterMu.Unlock() continue } clonedCluster := c.cluster.Clone() c.clusterMu.Unlock() if clonedCluster == nil { continue } c.tryUpdateMigrationStatus(c.ctx, clonedCluster) } } } func (c *ClusterChecker) Close() { c.cancelFn() c.wg.Wait() }