store/engine/zookeeper/zookeeper.go (221 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package zookeeper import ( "context" "errors" "strings" "sync" "sync/atomic" "time" "github.com/apache/kvrocks-controller/logger" "github.com/apache/kvrocks-controller/store/engine" "github.com/go-zookeeper/zk" ) const ( sessionTTL = 6 * time.Second defaultElectPath = "/kvrocks/controller/leader" ) type Config struct { Addrs []string `yaml:"addrs"` Scheme string `yaml:"scheme"` Auth string `yaml:"auth"` ElectPath string `yaml:"elect_path"` } type Zookeeper struct { conn *zk.Conn acl []zk.ACL // We will set this ACL for the node we have created leaderMu sync.RWMutex leaderID string myID string electPath string isReady atomic.Bool quitCh chan struct{} leaderChangeCh chan bool wg sync.WaitGroup } func New(id string, cfg *Config) (*Zookeeper, error) { if len(id) == 0 { return nil, errors.New("id must NOT be a empty string") } conn, _, err := zk.Connect(cfg.Addrs, sessionTTL) if err != nil { return nil, err } electPath := defaultElectPath if cfg.ElectPath != "" { electPath = cfg.ElectPath } acl := zk.WorldACL(zk.PermAll) if cfg.Scheme != "" && cfg.Auth != "" { err := conn.AddAuth(cfg.Scheme, []byte(cfg.Auth)) if err == nil { acl = []zk.ACL{{Perms: zk.PermAll, Scheme: cfg.Scheme, ID: cfg.Auth}} } else { logger.Get().Warn("Zookeeper addAuth fail: " + err.Error()) } } e := &Zookeeper{ myID: id, acl: acl, electPath: electPath, conn: conn, quitCh: make(chan struct{}), leaderChangeCh: make(chan bool), wg: sync.WaitGroup{}, } e.isReady.Store(false) e.wg.Add(1) go e.electLoop(context.Background()) return e, nil } func (e *Zookeeper) ID() string { return e.myID } func (e *Zookeeper) Leader() string { e.leaderMu.RLock() defer e.leaderMu.RUnlock() return e.leaderID } func (e *Zookeeper) LeaderChange() <-chan bool { return e.leaderChangeCh } func (e *Zookeeper) IsReady(ctx context.Context) bool { for { select { case <-e.quitCh: return false case <-time.After(100 * time.Millisecond): if e.isReady.Load() { return true } case <-ctx.Done(): return e.isReady.Load() } } } func (e *Zookeeper) Get(ctx context.Context, key string) ([]byte, error) { data, _, err := e.conn.Get(key) if err != nil { if errors.Is(err, zk.ErrNoNode) { return nil, nil // Key does not exist } return nil, err } return data, nil } func (e *Zookeeper) Exists(ctx context.Context, key string) (bool, error) { exists, _, err := e.conn.Exists(key) if err != nil { return false, err } return exists, nil } // Set sets the value for the key. If the key exists, it will be set; if not, it will be created. func (e *Zookeeper) Set(ctx context.Context, key string, value []byte) error { exist, _ := e.Exists(ctx, key) if exist { _, err := e.conn.Set(key, value, -1) return err } return e.Create(ctx, key, value, 0) } func (e *Zookeeper) Create(ctx context.Context, key string, value []byte, flags int32) error { lastSlashIndex := strings.LastIndex(key, "/") if lastSlashIndex > 0 { substring := key[:lastSlashIndex] // If the parent node does not exist, create the parent node recursively exist, _ := e.Exists(ctx, substring) if !exist { err := e.Create(ctx, substring, []byte{}, 0) if err != nil { return err } } } _, err := e.conn.Create(key, value, flags, e.acl) return err } func (e *Zookeeper) Delete(ctx context.Context, key string) error { err := e.conn.Delete(key, -1) if errors.Is(err, zk.ErrNoNode) { return nil // Key does not exist } return err } func (e *Zookeeper) List(ctx context.Context, prefix string) ([]engine.Entry, error) { children, _, err := e.conn.Children(prefix) if errors.Is(err, zk.ErrNoNode) { return []engine.Entry{}, nil } else if err != nil { return nil, err } entries := make([]engine.Entry, 0) for _, child := range children { key := prefix + "/" + child data, _, err := e.conn.Get(key) if err != nil { return nil, err } entry := engine.Entry{ Key: child, Value: data, } entries = append(entries, entry) } return entries, nil } func (e *Zookeeper) SetleaderID(newLeaderID string) { if newLeaderID != "" && newLeaderID != e.leaderID { if !e.isReady.Load() { // we set ready flag when leaderID first changed e.isReady.Store(true) } e.leaderMu.Lock() e.leaderID = newLeaderID e.leaderMu.Unlock() e.leaderChangeCh <- true } } func (e *Zookeeper) electLoop(ctx context.Context) { defer e.wg.Done() reset: select { case <-e.quitCh: return default: } err := e.Create(ctx, e.electPath, []byte(e.myID), zk.FlagEphemeral) if err != nil && !errors.Is(err, zk.ErrNodeExists) { time.Sleep(sessionTTL / 3) goto reset } data, _, ch, err := e.conn.GetW(e.electPath) if err != nil { time.Sleep(sessionTTL / 3) goto reset } e.SetleaderID(string(data)) for { select { case resp := <-ch: if resp.Type == zk.EventNodeDeleted { err := e.Create(ctx, e.electPath, []byte(e.myID), zk.FlagEphemeral) if err != nil && !errors.Is(err, zk.ErrNodeExists) { time.Sleep(sessionTTL / 3) goto reset } } data, _, ch, err = e.conn.GetW(e.electPath) if err != nil { time.Sleep(sessionTTL / 3) goto reset } e.SetleaderID(string(data)) case <-e.quitCh: logger.Get().Info(e.myID + " Exit the leader election loop") return } } } func (e *Zookeeper) Close() error { close(e.quitCh) e.wg.Wait() e.conn.Close() return nil }