internal/gitaly/storage/raftmgr/replica_log_store.go (413 lines of code) (raw):

package raftmgr import ( "context" "errors" "fmt" "os" "path/filepath" "sync" "github.com/dgraph-io/badger/v4" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/mode" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr/partition/log" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal" lg "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/safe" "gitlab.com/gitlab-org/gitaly/v16/internal/structerr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "go.etcd.io/raft/v3" "go.etcd.io/raft/v3/raftpb" "google.golang.org/protobuf/proto" ) // InitStatus represents the initialization status of the Raft log store. type InitStatus int const ( // InitStatusUnknown indicates an unknown initialization status, typically due to an error. InitStatusUnknown InitStatus = iota // InitStatusUnbootstrapped indicates that the Raft log store has not been bootstrapped yet // and contains no log entries. This is the state of a completely new installation where // no data exists and the Raft group needs to be initialized for the first time. InitStatusUnbootstrapped // InitStatusNeedsBackfill indicates that Raft is being enabled for an existing partition that // already contains log entries. In this state, the system needs to backfill Raft metadata // for these pre-existing entries before bootstrapping the Raft group. This happens when // migrating a non-Raft log store system to use Raft consensus. InitStatusNeedsBackfill // InitStatusBootstrapped indicates that the Raft log store has been previously bootstrapped. // This means a hard state exists and the Raft group has been initialized before. InitStatusBootstrapped ) var ( // RaftCommittedPosition tracks the highest log entry known to be committed in the Raft consensus. This position // is used by the log manager for cleaning up committed entries and maintaining cluster consistency. RaftCommittedPosition = storage.PositionType{Name: "RaftCommittedPosition", ShouldNotify: false} // RaftSnapshotPosition tracks the latest log entry included in a Raft snapshot. This position enables efficient // log truncation and recovery by indicating which entries can be safely removed from the write-ahead log. // Currently, this position matches the committed position as auto-compaction is not yet implemented. This will // change with the implementation of auto-compaction in the following issue: // https://gitlab.com/gitlab-org/gitaly/-/issues/6463 RaftSnapshotPosition = storage.PositionType{Name: "RaftSnapshotPosition", ShouldNotify: false} // KeyHardState is the database key for storing the member's current Raft hard state. // This state must be persisted before sending any messages to ensure consistency. KeyHardState = []byte("raft/hard_state") // KeyConfState is the database key for storing the current Raft configuration state. // This state represents the cluster membership and is used when generating snapshots. KeyConfState = []byte("raft/conf_state") // KeyLastConfigChange denotes the LSN of the last config change entry. KeyLastConfigChange = []byte("raft/latest_config_change") // RaftDBKeys contain the list of Raft-related DB keys. RaftDBKeys = [][]byte{KeyHardState, KeyConfState, KeyLastConfigChange} ) // ReplicaLogStore implements the raft.Storage interface and manages the persistence of Raft state // in coordination with etcd/raft and log.Manager. // // During the lifecycle of etcd/raft, the library requests the application to persist two types of data: // 1. Log entries to replicate to other members, along with additional Raft-specific metadata // (e.g., hard state, config state). // 2. The latest state of the Raft group, such as hard state, config state, snapshot state, etc. // // To persist the first type of data, ReplicaLogStore writes an additional RAFT file encapsulating // this metadata to the staging directory of the target entry. It then coordinates with log.Manager // to finalize the log appending operation. Later, the etcd/raft library retrieves the log entry and // its associated Raft metadata. The second type of data is persisted in the key-value database. // // Log entry directory without Raft: // |_ MANIFEST -> gitalypb.LogEntry // |_ 1 // |_ 2 // |_ ... // // Log entry directory with Raft: // |_ MANIFEST -> gitalypb.LogEntry // |_ RAFT -> raftpb.Entry // |_ 1 // |_ 2 // |_ ... // // In the hard state provided by etcd/raft state machine, the committed index (let's called it // committedLSN) is crucial. It tracks the latest LSN (Log Sequence Number) acknowledged by // the Raft group's quorum. This index complements the existing appendedLSN managed by log.Manager. // When a log entry is appended, it goes through two distinct stages: // 1. Appending to the local WAL (via log.Manager) and receiving an associated LSN. At this stage, // the log entry is persisted but cannot yet be applied. // 2. The leader sends the log entry to each member of the Raft group. If the quorum // acknowledges that they have persisted the entry, the leader marks it as "committed." At this // point, the entry is ready to be applied by the leader. Followers will apply it after // receiving the next update from the leader. // // With the introduction of committedLSN, ReplicaLogStore proxies certain functionalities of // log.Manager, particularly log consumption. In log.Manager, the consumer is notified // immediately after a log entry is appended to the local WAL. However, with Raft, the // consumer is notified only when the log entry is committed. /* ┌─ Last Raft snapshot taken │ ┌─Consumer not acknowledged │ │ ┌─ Applied til this point │ │ │ committedLSN appendedLSN │ │ │ │ │ ┌─┐ ┌─┐ ┌─┐ ┌─┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌▼┐ ┌─┐ ┌─┐ ┌─┐ ┌▼┐ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ └─┘ ◄───────────► ◄───────────► Can remove Need confirmed from quorum Not ready to be used */ type ReplicaLogStore struct { ctx context.Context mutex sync.Mutex authorityName string partitionID storage.PartitionID database keyvalue.Transactioner localLog *log.Manager committedLSN storage.LSN lastTerm uint64 consumer storage.LogConsumer stagingDir string snapshotter ReplicaSnapshotter // hooks is a collection of hooks, used in test environment to intercept critical events hooks replicaHooks } // raftManifestPath returns the path to the manifest file within a log entry directory. The manifest file contains // metadata about the Raft log entry, such as its term, index, the marshaled content of gitalypb.RaftEntry, and so on. // This file is stored in the log entry directory alongside the MANIFEST file. It is created as part of the log // appending operation. The etcd/raft library requires the application to persist this metadata to enable later // retrieval. func raftManifestPath(logEntryPath string) string { return filepath.Join(logEntryPath, "RAFT") } // NewReplicaLogStore creates and initializes a new Storage instance. func NewReplicaLogStore( authorityName string, partitionID storage.PartitionID, raftCfg config.Raft, db keyvalue.Transactioner, stagingDirectory string, stateDirectory string, consumer storage.LogConsumer, positionTracker *log.PositionTracker, logger lg.Logger, metrics *Metrics, ) (*ReplicaLogStore, error) { if err := positionTracker.Register(RaftCommittedPosition); err != nil { return nil, fmt.Errorf("registering committed position: %w", err) } if err := positionTracker.Register(RaftSnapshotPosition); err != nil { return nil, fmt.Errorf("registering snapshot position: %w", err) } // Initialize the local log manager without a consumer since notifications // should only be sent when entries are committed, not when they're appended localLog := log.NewManager( authorityName, partitionID, stagingDirectory, stateDirectory, nil, positionTracker, ) logger = logger.WithFields(lg.Fields{ "partition_id": partitionID, "storage_name": authorityName, }) snapshotter, err := NewReplicaSnapshotter(raftCfg, logger, metrics.Scope(authorityName)) if err != nil { return nil, fmt.Errorf("create raft snapshotter: %w", err) } return &ReplicaLogStore{ database: db, authorityName: authorityName, partitionID: partitionID, localLog: localLog, consumer: consumer, stagingDir: stagingDirectory, snapshotter: snapshotter, hooks: noopHooks(), }, nil } // initialize loads all states from DB and disk. It also checks whether the leader has completed its initial bootstrap // process by verifying the existence of a saved hard state. func (s *ReplicaLogStore) initialize(ctx context.Context, appliedLSN storage.LSN) (InitStatus, error) { s.mutex.Lock() defer s.mutex.Unlock() s.ctx = ctx if err := s.localLog.Initialize(ctx, appliedLSN); err != nil { return InitStatusUnknown, fmt.Errorf("initializing local log manager: %w", err) } // Try to load the previous Raft hard state status := InitStatusUnknown var hardState raftpb.HardState if err := s.readKey(KeyHardState, func(value []byte) error { return hardState.Unmarshal(value) }); err == nil { // If there's a previously persisted hard state, the cluster was bootstrapped. The storage loads persisted data // and resumes from there. s.committedLSN = storage.LSN(hardState.Commit) s.lastTerm = hardState.Term status = InitStatusBootstrapped } else if errors.Is(err, badger.ErrKeyNotFound) { // No previous state exists - this is a fresh installation. As Raft feature is optional, there's // a chance that Raft is enabled for an existing partition. When so, all appended log entries // are considered to be committed. Raft log store needs to backfill Raft manifest to such entries. s.committedLSN = s.localLog.AppendedLSN() if s.committedLSN == 0 { status = InitStatusUnbootstrapped } else { status = InitStatusNeedsBackfill } } else { return status, err } // This loop iterates through all existing log entries to ensure each has the required Raft metadata. // When initializing a Raft system on an existing partition, each entry needs Raft-specific metadata. // This backfilling process occurs when migrating from a non-Raft to a Raft-enabled system to ensure // existing data remains accessible without requiring a full data migration or risking data loss. // Now we backfill heading entries regardless of initialization status. While it's not ideal, Gitaly supports // toggling Raft support multiple times. When Raft feature becomes stable in the future, we should only // back-fill if the init status is // InitStatusNeedsBackfill. for lsn := s.localLog.LowWaterMark(); lsn <= s.localLog.AppendedLSN(); lsn++ { // Check if the current log entry already has Raft metadata exist, err := s.hasRaftEntry(lsn) if err != nil { return InitStatusUnknown, fmt.Errorf("checking raft manifest existence: %w", err) } if exist { continue } logEntryPath := s.localLog.GetEntryPath(lsn) message := &gitalypb.RaftEntry{ Data: &gitalypb.RaftEntry_LogData{ LocalPath: []byte(logEntryPath), }, } data, err := proto.Marshal(message) if err != nil { return InitStatusUnknown, fmt.Errorf("marshaling Raft message: %w", err) } // Write the Raft metadata alongside the existing log entry This effectively "converts" regular log // entries into Raft-managed entries by associating them with the current term and their sequential // index. if err := s.writeRaftEntry(raftpb.Entry{ Term: s.lastTerm, Index: uint64(lsn), Type: raftpb.EntryNormal, Data: data, }, logEntryPath); err != nil { return InitStatusUnknown, fmt.Errorf("backfilling Raft metadata: %w", err) } } if s.committedLSN != 0 { // Update both commit and snapshot positions to match the last committed LSN if err := s.localLog.AcknowledgePosition(RaftCommittedPosition, s.committedLSN); err != nil { return InitStatusUnknown, fmt.Errorf("acknowledging committed position: %w", err) } if err := s.localLog.AcknowledgePosition(RaftSnapshotPosition, s.committedLSN); err != nil { return InitStatusUnknown, fmt.Errorf("acknowledging committed position: %w", err) } if s.consumer != nil { s.consumer.NotifyNewEntries(s.authorityName, s.partitionID, s.localLog.LowWaterMark(), s.committedLSN) } } return status, nil } func (s *ReplicaLogStore) close() error { return s.localLog.Close() } // Entries implements raft.Storage's Entries(). It returns the list of entries which are still managed of range [lo, hi) func (s *ReplicaLogStore) Entries(lo uint64, hi uint64, maxSize uint64) ([]raftpb.Entry, error) { firstLSN := uint64(s.localLog.LowWaterMark()) lastLSN := uint64(s.localLog.AppendedLSN()) if lo < firstLSN { return nil, raft.ErrCompacted } if firstLSN > lastLSN { return nil, raft.ErrUnavailable } if hi > lastLSN+1 { return nil, fmt.Errorf("reading out-of-bound entries %d > %d", hi, lastLSN+1) } boundary := hi - 1 if maxSize != 0 { boundary = min(lo+maxSize-1, hi-1) } var entries []raftpb.Entry for lsn := lo; lsn <= boundary; lsn++ { entry, err := s.readRaftEntry(storage.LSN(lsn)) if err != nil { return nil, fmt.Errorf("reading raft entry: %w", err) } entries = append(entries, entry) } return entries, nil } // InitialState retrieves the initial Raft HardState and ConfState from persistent storage. It is used to initialize the // Raft state machine with the previously saved state. func (s *ReplicaLogStore) InitialState() (raftpb.HardState, raftpb.ConfState, error) { hardState, err := s.readHardState() if err != nil { return raftpb.HardState{}, raftpb.ConfState{}, fmt.Errorf("reading hard state: %w", err) } confState, err := s.readConfState() if err != nil { return raftpb.HardState{}, raftpb.ConfState{}, fmt.Errorf("reading conf state: %w", err) } return hardState, confState, nil } // LastIndex returns the last index of all entries currently available in the log. // This corresponds to the last LSN in the write-ahead log. func (s *ReplicaLogStore) LastIndex() (uint64, error) { return uint64(s.localLog.AppendedLSN()), nil } // FirstIndex returns the first index of all entries currently available in the log. // This corresponds to the first LSN in the write-ahead log. func (s *ReplicaLogStore) FirstIndex() (uint64, error) { return uint64(s.localLog.LowWaterMark()), nil } // Snapshot returns the latest snapshot of the state machine. As we haven't supported autocompaction feature, this // method always returns Unavailable error. // For more information: https://gitlab.com/gitlab-org/gitaly/-/issues/6463 func (s *ReplicaLogStore) Snapshot() (raftpb.Snapshot, error) { return raftpb.Snapshot{}, raft.ErrSnapshotTemporarilyUnavailable } // TriggerSnapshot starts the process of taking a snapshot of the partition's disk func (s *ReplicaLogStore) TriggerSnapshot(ctx context.Context, appliedLSN storage.LSN, lastTerm uint64) (*ReplicaSnapshot, error) { // prevent multiple snapshotters from running at the same time s.snapshotter.Lock() defer s.snapshotter.Unlock() // get the transaction from context to reuse the same snapshot tx := storage.ExtractTransaction(ctx) if tx == nil { return nil, structerr.NewInternal("raft snapshotter: transaction not initialized") } // snapshot metadata are important to track what logs should be applied after snapshot restoration snapshot, err := s.snapshotter.materializeSnapshot(ReplicaSnapshotMetadata{index: appliedLSN, term: lastTerm}, tx) if err != nil { return nil, fmt.Errorf("materialize snapshot: %w", err) } return snapshot, nil } // Term returns the term of the entry at a given index. func (s *ReplicaLogStore) Term(i uint64) (uint64, error) { firstLSN := uint64(s.localLog.LowWaterMark()) lastLSN := uint64(s.localLog.AppendedLSN()) if i > lastLSN { return 0, raft.ErrUnavailable } else if i < firstLSN { // This also means lastLSN < firstLSN. There are two scenarios that lead to this condition: // - The WAL is completely empty, likely because the Raft group hasn't been bootstrapped. In this case, // this method can simply return 0. // - All log entries have been pruned after a restart. // // The second scenario is more complex. The Raft state machine frequently queries the term of the latest // log entry, especially after etcd/raft's node restarts. In theory, the content of the latest log entry // must be preserved even after being processed. However, this approach is impractical. It could cause // inactive partitions to retain log entries indefinitely until new entries are received. // // To address this, the term of the last log entry is maintained in memory. Its value is derived from // the persisted hard state when the Raft replica restarts. After a new entry is persisted, the value is // updated. if i == lastLSN { return s.lastTerm, nil } return 0, raft.ErrCompacted } raftEntry, err := s.readRaftEntry(storage.LSN(i)) if err != nil { return 0, fmt.Errorf("read log entry term: %w", err) } return raftEntry.Term, nil } func (s *ReplicaLogStore) readCommittedLSN() storage.LSN { s.mutex.Lock() defer s.mutex.Unlock() return s.committedLSN } // setKey marshals and stores a given protocol buffer message into the database under the given key. func (s *ReplicaLogStore) setKey(key []byte, value []byte) error { return s.database.Update(func(tx keyvalue.ReadWriter) error { return tx.Set(key, value) }) } // readKey reads a key from the database and unmarshals its value in to the destination protocol // buffer message. func (s *ReplicaLogStore) readKey(key []byte, unmarshal func([]byte) error) error { return s.database.View(func(txn keyvalue.ReadWriter) error { item, err := txn.Get(key) if err != nil { return fmt.Errorf("get: %w", err) } return item.Value(unmarshal) }) } // saveConfState persists the current Raft configuration state to disk, ensuring that configuration changes are durable. func (s *ReplicaLogStore) saveHardState(hardState raftpb.HardState) error { marshaled, err := hardState.Marshal() if err != nil { return fmt.Errorf("marshaling hard state: %w", err) } committedLSN := storage.LSN(hardState.Commit) if err := func() error { s.hooks.BeforeSaveHardState() s.mutex.Lock() defer s.mutex.Unlock() if committedLSN > s.localLog.AppendedLSN() { return fmt.Errorf("next committed LSN exceeds appended LSN %d > %d", committedLSN, s.localLog.AppendedLSN()) } if err := s.setKey(KeyHardState, marshaled); err != nil { return fmt.Errorf("setting hard state key: %w", err) } if err := s.localLog.AcknowledgePosition(RaftCommittedPosition, committedLSN); err != nil { return fmt.Errorf("acknowledging committed position: %w", err) } // Auto-compaction and snapshot are not yet supported. So, the snapshot position will always be the same // as the committed position. It means the underlying local log manager can prune log entries older than // the snapshot position. if err := s.localLog.AcknowledgePosition(RaftSnapshotPosition, committedLSN); err != nil { return fmt.Errorf("acknowledging snapshot position: %w", err) } s.committedLSN = committedLSN return nil }(); err != nil { return err } if s.consumer != nil { s.consumer.NotifyNewEntries(s.authorityName, s.partitionID, s.localLog.LowWaterMark(), committedLSN) } return nil } func (s *ReplicaLogStore) readHardState() (raftpb.HardState, error) { var hardState raftpb.HardState if err := s.readKey(KeyHardState, func(value []byte) error { return hardState.Unmarshal(value) }); err != nil { if errors.Is(err, badger.ErrKeyNotFound) { return raftpb.HardState{}, nil } return raftpb.HardState{}, err } return hardState, nil } // saveConfState persists latest conf state to. It is used when generating snapshot. func (s *ReplicaLogStore) saveConfState(confState raftpb.ConfState) error { marshaled, err := confState.Marshal() if err != nil { return fmt.Errorf("marshaling conf state: %w", err) } return s.setKey(KeyConfState, marshaled) } func (s *ReplicaLogStore) readConfState() (raftpb.ConfState, error) { var confState raftpb.ConfState if err := s.readKey(KeyConfState, func(value []byte) error { return confState.Unmarshal(value) }); err != nil { if errors.Is(err, badger.ErrKeyNotFound) { return raftpb.ConfState{}, nil } return raftpb.ConfState{}, err } return confState, nil } // draftLogEntry drafts a log entry and inserts it to WAL at a certain position. The caller passes a callback function // for setting the content of the log entry. func (s *ReplicaLogStore) draftLogEntry(raftEntry raftpb.Entry, callback func(*wal.Entry) error) (returnedErr error) { // Create a temp directory for drafting log entry. This directory will be moved to the state directory of the // local log manager. It's only cleaned up if there's an error along the way. logEntryPath, err := os.MkdirTemp(s.stagingDir, "") if err != nil { return fmt.Errorf("mkdir temp: %w", err) } defer func() { if returnedErr != nil { returnedErr = errors.Join(returnedErr, os.RemoveAll(logEntryPath)) } }() // Draft a manifest and let the caller sets its content. walEntry := wal.NewEntry(logEntryPath) if err := callback(walEntry); err != nil { return fmt.Errorf("modifying wal entry: %w", err) } // Write the manifest file. if err := wal.WriteManifest(s.ctx, walEntry.Directory(), &gitalypb.LogEntry{ Operations: walEntry.Operations(), }); err != nil { return fmt.Errorf("writing manifest file: %w", err) } // The fsync is essential to flush the content of the manifest file itself. We also need to fsync the parent to // ensure the creation of the file is flushed. That part will be covered in insertLogEntry after the Raft // artifact file is created. if err := safe.NewSyncer().Sync(s.ctx, wal.ManifestPath(walEntry.Directory())); err != nil { return fmt.Errorf("sync raft manifest file: %w", err) } // Finally, insert it to WAL. return s.insertLogEntry(raftEntry, logEntryPath) } // insertLogEntry inserts a log entry to WAL at a certain position with respective Raft metadata. func (s *ReplicaLogStore) insertLogEntry(raftEntry raftpb.Entry, logEntryPath string) error { s.hooks.BeforeInsertLogEntry(raftEntry.Index) s.mutex.Lock() defer s.mutex.Unlock() lsn := storage.LSN(raftEntry.Index) // Although etcd/raft allows inserting log entry at a pre-existing position, it should not be less than the // committed LSN. Committed entries are properly applied to the persistent storage of this member. Thus, there's // nothing we can do about that except for rejecting the entry. The Raft protocol should guarantee that this // situation never happens. if lsn < s.committedLSN { return fmt.Errorf("inserted LSN at the point lower than committed LSN") } // It's normal for etcd/raft to request a log entry to be inserted to an existing index in the WAL. // That can occur when a log entry is not committed by the quorum, due to a network parity for example. The new // leader will send new log entries with a higher term to all members in the group for acknowledgement. All // members should then replace obsoleted entries with new ones. All log entries after replaced log entry should // also be eventually removed. if lsn <= s.localLog.AppendedLSN() { if err := s.localLog.DeleteTrailingLogEntries(lsn); err != nil { return fmt.Errorf("deleting trailing log entries: %w", err) } } if err := s.writeRaftEntry(raftEntry, logEntryPath); err != nil { return fmt.Errorf("writing raft manifest: %w", err) } if _, err := s.localLog.CompareAndAppendLogEntry(lsn, logEntryPath); err != nil { return fmt.Errorf("inserting log entry to WAL: %w", err) } s.lastTerm = raftEntry.Term return nil } // writeRaftEntry writes the Raft metadata to the given position in the log. func (s *ReplicaLogStore) writeRaftEntry(raftEntry raftpb.Entry, logEntryPath string) error { marshaledEntry, err := raftEntry.Marshal() if err != nil { return fmt.Errorf("marshaling raft entry: %w", err) } // Finalize the log entry by writing the RAFT file into the log entry's directory. manifestPath := raftManifestPath(logEntryPath) if err := os.WriteFile(manifestPath, marshaledEntry, mode.File); err != nil { return fmt.Errorf("writing raft manifest file: %w", err) } if err := safe.NewSyncer().Sync(s.ctx, manifestPath); err != nil { return fmt.Errorf("sync raft manifest file: %w", err) } if err := safe.NewSyncer().SyncParent(s.ctx, manifestPath); err != nil { return fmt.Errorf("sync raft manifest parent: %w", err) } return nil } // readRaftEntry returns the Raft metadata from the given position in the log. func (s *ReplicaLogStore) readRaftEntry(lsn storage.LSN) (raftpb.Entry, error) { var raftEntry raftpb.Entry marshaledBytes, err := os.ReadFile(raftManifestPath(s.localLog.GetEntryPath(lsn))) if err != nil { return raftEntry, err } if err := raftEntry.Unmarshal(marshaledBytes); err != nil { return raftEntry, fmt.Errorf("unmarshal term: %w", err) } return raftEntry, nil } func (s *ReplicaLogStore) hasRaftEntry(lsn storage.LSN) (bool, error) { _, err := os.Stat(raftManifestPath(s.localLog.GetEntryPath(lsn))) if err == nil { return true, nil } else if os.IsNotExist(err) { return false, nil } return false, err } func (s *ReplicaLogStore) readLogEntry(lsn storage.LSN) (*gitalypb.LogEntry, error) { return wal.ReadManifest(s.localLog.GetEntryPath(lsn)) } // Compile-time type check. var _ = (raft.Storage)(&ReplicaLogStore{})