store/engine/raft/store.go (216 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package raft import ( "encoding/json" "errors" "fmt" "os" "slices" "strings" "sync" "github.com/apache/kvrocks-controller/logger" "github.com/apache/kvrocks-controller/store/engine" "go.etcd.io/etcd/pkg/fileutil" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" ) var ErrKeyNotFound = errors.New("key not found") type DataStore struct { walDir string snapshotDir string snapshotter *snap.Snapshotter wal *wal.WAL raftStorage *raft.MemoryStorage mu sync.RWMutex kvs map[string][]byte } func NewDataStore(dir string) *DataStore { snapshotDir := fmt.Sprintf("%s/snapshot", dir) snapshotter := snap.New(logger.Get(), snapshotDir) return &DataStore{ walDir: fmt.Sprintf("%s/wal", dir), snapshotDir: snapshotDir, snapshotter: snapshotter, raftStorage: raft.NewMemoryStorage(), kvs: make(map[string][]byte), } } func (ds *DataStore) walExists() bool { return wal.Exist(ds.walDir) } func (ds *DataStore) loadSnapshotFromDisk() (*raftpb.Snapshot, error) { if !fileutil.Exist(ds.snapshotDir) { if err := os.MkdirAll(ds.snapshotDir, 0750); err != nil { return nil, err } } emptySnapshot := &raftpb.Snapshot{} if !ds.walExists() { return emptySnapshot, nil } snapshots, err := wal.ValidSnapshotEntries(logger.Get(), ds.walDir) if err != nil { return nil, err } latestSnapshot, err := ds.snapshotter.LoadNewestAvailable(snapshots) if err != nil { if errors.Is(err, snap.ErrNoSnapshot) { return emptySnapshot, nil } return nil, err } return latestSnapshot, nil } func (ds *DataStore) reloadSnapshot() error { snapshot, err := ds.snapshotter.Load() if errors.Is(err, snap.ErrNoSnapshot) { return nil } if err != nil { return err } var m map[string][]byte if err := json.Unmarshal(snapshot.Data, &m); err != nil { return err } ds.mu.Lock() ds.kvs = m ds.mu.Unlock() return nil } func (ds *DataStore) openWAL(snapshot *raftpb.Snapshot) (*wal.WAL, error) { if !ds.walExists() { if err := os.MkdirAll(ds.walDir, 0750); err != nil { return nil, err } w, err := wal.Create(logger.Get(), ds.walDir, nil) if err != nil { return nil, err } w.Close() } walSnapshot := walpb.Snapshot{} if snapshot != nil { walSnapshot.Index = snapshot.Metadata.Index walSnapshot.Term = snapshot.Metadata.Term } return wal.Open(logger.Get(), ds.walDir, walSnapshot) } func (ds *DataStore) replayWAL() (*raftpb.Snapshot, error) { snapshot, err := ds.loadSnapshotFromDisk() if err != nil { return nil, fmt.Errorf("failed to load newest snapshot: %w", err) } w, err := ds.openWAL(snapshot) if err != nil { return nil, fmt.Errorf("failed to open WAL: %w", err) } ds.wal = w _, hardState, entries, err := w.ReadAll() if err != nil { return nil, fmt.Errorf("failed to read WAL: %w", err) } if snapshot != nil { _ = ds.raftStorage.ApplySnapshot(*snapshot) } if err := ds.raftStorage.SetHardState(hardState); err != nil { return nil, fmt.Errorf("failed to set hard state: %w", err) } if err := ds.raftStorage.Append(entries); err != nil { return nil, fmt.Errorf("failed to append entries: %w", err) } if err := ds.reloadSnapshot(); err != nil { return nil, fmt.Errorf("failed to reload snapshot: %w", err) } for _, entry := range entries { if err := ds.applyDataEntry(entry); err != nil { return nil, fmt.Errorf("failed to apply data entry: %w", err) } } return snapshot, nil } func (ds *DataStore) saveSnapshot(snapshot raftpb.Snapshot) error { walSnap := walpb.Snapshot{ Index: snapshot.Metadata.Index, Term: snapshot.Metadata.Term, ConfState: &snapshot.Metadata.ConfState, } if err := ds.snapshotter.SaveSnap(snapshot); err != nil { return err } if err := ds.wal.SaveSnapshot(walSnap); err != nil { return err } return ds.wal.ReleaseLockTo(snapshot.Metadata.Index) } func (ds *DataStore) applyDataEntry(entry raftpb.Entry) error { if entry.Type != raftpb.EntryNormal || len(entry.Data) == 0 { return nil } var e Event if err := json.Unmarshal(entry.Data, &e); err != nil { return err } switch e.Op { case opSet: ds.Set(e.Key, e.Value) case opDelete: ds.Delete(e.Key) case opGet: // do nothing default: return fmt.Errorf("unknown operation type: %d", e.Op) } return nil } func (ds *DataStore) Set(key string, value []byte) { ds.mu.Lock() defer ds.mu.Unlock() ds.kvs[key] = value } func (ds *DataStore) Get(key string) ([]byte, error) { ds.mu.RLock() defer ds.mu.RUnlock() if v, ok := ds.kvs[key]; ok { return v, nil } return nil, ErrKeyNotFound } func (ds *DataStore) Delete(key string) { ds.mu.Lock() defer ds.mu.Unlock() delete(ds.kvs, key) } func (ds *DataStore) List(prefix string) []engine.Entry { ds.mu.RLock() defer ds.mu.RUnlock() entries := make([]engine.Entry, 0) for key := range ds.kvs { if !strings.HasPrefix(key, prefix) || key == prefix { continue } trimmedKey := strings.TrimLeft(key[len(prefix)+1:], "/") if strings.ContainsRune(trimmedKey, '/') { continue } entries = append(entries, engine.Entry{ Key: trimmedKey, Value: ds.kvs[trimmedKey], }) } slices.SortFunc(entries, func(i, j engine.Entry) int { return strings.Compare(i.Key, j.Key) }) return entries } func (ds *DataStore) GetDataStoreSnapshot() ([]byte, error) { ds.mu.RLock() defer ds.mu.RUnlock() return json.Marshal(ds.kvs) } func (ds *DataStore) Close() { if ds.wal != nil { ds.wal.Close() } }