store/engine/etcd/etcd.go (247 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package etcd import ( "context" "errors" "strings" "sync" "sync/atomic" "time" "github.com/apache/kvrocks-controller/consts" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" "go.etcd.io/etcd/pkg/transport" "go.uber.org/zap" "github.com/apache/kvrocks-controller/logger" "github.com/apache/kvrocks-controller/store/engine" ) const ( sessionTTL = 6 defaultDialTimeout = 5 * time.Second ) const defaultElectPath = "/kvrocks/controller/leader" type Config struct { Addrs []string `yaml:"addrs"` Username string `yaml:"username"` Password string `yaml:"password"` TLS struct { Enable bool `yaml:"enable"` CertFile string `yaml:"cert_file"` KeyFile string `yaml:"key_file"` TrustedCAFile string `yaml:"ca_file"` } `yaml:"tls"` ElectPath string `yaml:"elect_path"` } type Etcd struct { client *clientv3.Client kv clientv3.KV leaderMu sync.RWMutex leaderID string myID string electPath string isReady atomic.Bool quitCh chan struct{} wg sync.WaitGroup electionCh chan *concurrency.Election leaderChangeCh chan bool } func New(id string, cfg *Config) (*Etcd, error) { if len(id) == 0 { return nil, errors.New("id must NOT be a empty string") } clientConfig := clientv3.Config{ Endpoints: cfg.Addrs, DialTimeout: defaultDialTimeout, Logger: logger.Get(), } if cfg.TLS.Enable { tlsInfo := transport.TLSInfo{ CertFile: cfg.TLS.CertFile, KeyFile: cfg.TLS.KeyFile, TrustedCAFile: cfg.TLS.TrustedCAFile, } tlsConfig, err := tlsInfo.ClientConfig() if err != nil { return nil, err } clientConfig.TLS = tlsConfig } if cfg.Username != "" && cfg.Password != "" { clientConfig.Username = cfg.Username clientConfig.Password = cfg.Password } client, err := clientv3.New(clientConfig) if err != nil { return nil, err } electPath := defaultElectPath if cfg.ElectPath != "" { electPath = cfg.ElectPath } e := &Etcd{ myID: id, electPath: electPath, client: client, kv: clientv3.NewKV(client), quitCh: make(chan struct{}), electionCh: make(chan *concurrency.Election), leaderChangeCh: make(chan bool), } e.isReady.Store(false) e.wg.Add(2) go e.electLoop(context.Background()) go e.observeLeaderEvent(context.Background()) return e, nil } func (e *Etcd) ID() string { return e.myID } func (e *Etcd) Leader() string { e.leaderMu.RLock() defer e.leaderMu.RUnlock() return e.leaderID } func (e *Etcd) LeaderChange() <-chan bool { return e.leaderChangeCh } func (e *Etcd) IsReady(ctx context.Context) bool { for { select { case <-e.quitCh: return false case <-time.After(100 * time.Millisecond): if e.isReady.Load() { return true } case <-ctx.Done(): return e.isReady.Load() } } } func (e *Etcd) Get(ctx context.Context, key string) ([]byte, error) { rsp, err := e.kv.Get(ctx, key) if err != nil { return nil, err } if len(rsp.Kvs) == 0 { return nil, consts.ErrNotFound } return rsp.Kvs[0].Value, nil } func (e *Etcd) Exists(ctx context.Context, key string) (bool, error) { _, err := e.Get(ctx, key) if err != nil { if errors.Is(err, consts.ErrNotFound) { return false, nil } return false, err } return true, nil } func (e *Etcd) Set(ctx context.Context, key string, value []byte) error { _, err := e.kv.Put(ctx, key, string(value)) return err } func (e *Etcd) Delete(ctx context.Context, key string) error { _, err := e.kv.Delete(ctx, key) return err } func (e *Etcd) List(ctx context.Context, prefix string) ([]engine.Entry, error) { rsp, err := e.kv.Get(ctx, prefix, clientv3.WithPrefix()) if err != nil { return nil, err } prefixLen := len(prefix) entries := make([]engine.Entry, 0) for _, kv := range rsp.Kvs { if string(kv.Key) == prefix { continue } key := strings.TrimLeft(string(kv.Key[prefixLen+1:]), "/") if strings.ContainsRune(key, '/') { continue } entries = append(entries, engine.Entry{ Key: key, Value: kv.Value, }) } return entries, nil } func (e *Etcd) electLoop(ctx context.Context) { defer e.wg.Done() for { select { case <-e.quitCh: return default: } reset: session, err := concurrency.NewSession(e.client, concurrency.WithTTL(sessionTTL)) if err != nil { logger.Get().With( zap.Error(err), ).Error("Failed to create session") time.Sleep(sessionTTL / 3) continue } election := concurrency.NewElection(session, e.electPath) e.electionCh <- election for { if err := election.Campaign(ctx, e.myID); err != nil { logger.Get().With( zap.Error(err), ).Error("Failed to acquire the leader campaign") continue } select { case <-session.Done(): logger.Get().Warn("Leader session is done") goto reset case <-e.quitCh: logger.Get().Info("Exit the leader election loop") return } } } } func (e *Etcd) observeLeaderEvent(ctx context.Context) { defer e.wg.Done() var election *concurrency.Election select { case elect := <-e.electionCh: election = elect case <-e.quitCh: return } ch := election.Observe(ctx) for { select { case resp := <-ch: e.isReady.Store(true) if len(resp.Kvs) > 0 { newLeaderID := string(resp.Kvs[0].Value) e.leaderMu.Lock() e.leaderID = newLeaderID e.leaderMu.Unlock() e.leaderChangeCh <- true if newLeaderID != "" && newLeaderID == e.leaderID { continue } } else { ch = election.Observe(ctx) e.leaderChangeCh <- false } case elect := <-e.electionCh: election = elect ch = election.Observe(ctx) case <-e.quitCh: logger.Get().Info("Exit the leader change observe loop") return } } } func (e *Etcd) Close() error { close(e.quitCh) e.wg.Wait() return e.client.Close() }