controller/controller.go (196 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package controller
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"go.uber.org/zap"
"github.com/apache/kvrocks-controller/config"
"github.com/apache/kvrocks-controller/consts"
"github.com/apache/kvrocks-controller/logger"
"github.com/apache/kvrocks-controller/store"
)
const (
stateInit = iota + 1
stateRunning
stateClosed
)
type Controller struct {
config *config.ControllerConfig
clusterStore *store.ClusterStore
mu sync.Mutex
clusters map[string]*ClusterChecker
wg sync.WaitGroup
state atomic.Int32
readyCh chan struct{}
closeCh chan struct{}
}
func New(s *store.ClusterStore, config *config.ControllerConfig) (*Controller, error) {
c := &Controller{
config: config,
clusterStore: s,
clusters: make(map[string]*ClusterChecker),
readyCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),
}
c.state.Store(stateInit)
return c, nil
}
func (c *Controller) Start(ctx context.Context) error {
if !c.state.CompareAndSwap(stateInit, stateRunning) {
return nil
}
c.wg.Add(1)
go c.syncLoop(ctx)
c.wg.Add(1)
go c.leaderEventLoop()
return nil
}
func (c *Controller) WaitForReady() {
<-c.readyCh
}
// suspend stops the controller from processing events if it's not the leader
func (c *Controller) suspend() {
c.mu.Lock()
for key, cluster := range c.clusters {
cluster.Close()
delete(c.clusters, key)
}
c.mu.Unlock()
}
// resume starts the controller to process events
func (c *Controller) resume(ctx context.Context) error {
namespaces, err := c.clusterStore.ListNamespace(ctx)
if err != nil {
return fmt.Errorf("failed to list namespaces: %w", err)
}
for _, ns := range namespaces {
clusters, err := c.clusterStore.ListCluster(ctx, ns)
if err != nil {
return fmt.Errorf("failed to list clusters: %w", err)
}
for _, cluster := range clusters {
c.addCluster(ns, cluster)
logger.Get().Debug("Resume the cluster", zap.String("namespace", ns), zap.String("cluster", cluster))
}
}
return nil
}
func (c *Controller) becomeLeader(ctx context.Context, prevTermLeader string) {
if prevTermLeader == c.clusterStore.ID() {
return
}
if err := c.resume(ctx); err != nil {
logger.Get().Error("Failed to resume the controller", zap.Error(err))
return
}
logger.Get().Info("Became the leader, resume the controller")
}
func (c *Controller) syncLoop(ctx context.Context) {
defer c.wg.Done()
prevTermLeader := ""
if c.clusterStore.IsLeader() {
c.becomeLeader(ctx, prevTermLeader)
prevTermLeader = c.clusterStore.ID()
}
c.readyCh <- struct{}{}
for {
select {
case <-c.clusterStore.LeaderChange():
if c.clusterStore.IsLeader() {
if prevTermLeader != c.clusterStore.ID() {
c.becomeLeader(ctx, prevTermLeader)
prevTermLeader = c.clusterStore.ID()
}
} else {
if prevTermLeader != c.clusterStore.ID() {
continue
}
c.suspend()
logger.Get().Warn("Lost the leader, suspend the controller")
}
case <-c.closeCh:
return
}
}
}
func (c *Controller) leaderEventLoop() {
defer c.wg.Done()
for {
select {
case event := <-c.clusterStore.Notify():
if !c.clusterStore.IsLeader() || event.Type != store.EventCluster {
continue
}
switch event.Command {
case store.CommandCreate:
c.addCluster(event.Namespace, event.Cluster)
case store.CommandRemove:
c.removeCluster(event.Namespace, event.Cluster)
case store.CommandUpdate:
c.updateCluster(event.Namespace, event.Cluster)
default:
logger.Get().Error("Unknown command", zap.Any("event", event))
}
case <-c.closeCh:
return
}
}
}
func (c *Controller) buildClusterKey(namespace, clusterName string) string {
return namespace + "/" + clusterName
}
func (c *Controller) addCluster(namespace, clusterName string) {
key := c.buildClusterKey(namespace, clusterName)
if cluster, err := c.getCluster(namespace, clusterName); err == nil && cluster != nil {
return
}
cluster := NewClusterChecker(c.clusterStore, namespace, clusterName).
WithPingInterval(time.Duration(c.config.FailOver.PingIntervalSeconds) * time.Second).
WithMaxFailureCount(c.config.FailOver.MaxPingCount)
cluster.Start()
c.mu.Lock()
c.clusters[key] = cluster
c.mu.Unlock()
}
func (c *Controller) getCluster(namespace, clusterName string) (*ClusterChecker, error) {
key := c.buildClusterKey(namespace, clusterName)
c.mu.Lock()
defer c.mu.Unlock()
cluster, ok := c.clusters[key]
if !ok {
return nil, consts.ErrNotFound
}
return cluster, nil
}
func (c *Controller) removeCluster(namespace, clusterName string) {
key := c.buildClusterKey(namespace, clusterName)
c.mu.Lock()
if cluster, ok := c.clusters[key]; ok {
cluster.Close()
delete(c.clusters, key)
}
c.mu.Unlock()
}
func (c *Controller) updateCluster(namespace, clusterName string) {
key := c.buildClusterKey(namespace, clusterName)
c.mu.Lock()
cluster, ok := c.clusters[key]
c.mu.Unlock()
if !ok {
logger.Get().With(
zap.String("namespace", namespace),
zap.String("clusterName", clusterName),
).Error("Cluster not found")
return
}
cluster.sendSyncEvent()
}
func (c *Controller) Close() {
if !c.state.CompareAndSwap(stateRunning, stateClosed) {
return
}
c.suspend()
close(c.readyCh)
close(c.closeCh)
c.wg.Wait()
}