internal/gitaly/storage/raftmgr/replica.go (663 lines of code) (raw):

package raftmgr import ( "context" "errors" "fmt" "runtime" "sync" "time" "github.com/prometheus/client_golang/prometheus" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/wal" logging "gitlab.com/gitlab-org/gitaly/v16/internal/log" "gitlab.com/gitlab-org/gitaly/v16/internal/safe" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" "go.etcd.io/raft/v3" "go.etcd.io/raft/v3/raftpb" "google.golang.org/protobuf/proto" ) type changeType string const ( addVoter changeType = "add_voter" addLearner changeType = "add_learner" removeNode changeType = "remove_node" ) // RaftReplica is an interface that defines the methods to orchestrate the Raft consensus protocol // for a partition in a Gitaly cluster. type RaftReplica interface { // Embed all LogManager methods for WAL operations storage.LogManager // Initialize prepares the Raft system with the given context and last applied LSN. // The appliedLSN parameter indicates the last log sequence number that was fully applied // to the partition's state, ensuring Raft processing begins from the correct point // in the log history. Initialize(ctx context.Context, appliedLSN storage.LSN) error // Step processes a Raft message from a remote node. // This is part of the RaftReplica interface and handles incoming Raft protocol messages // from other members of the Raft group. These messages include heartbeats, vote requests, // log entries, and other Raft protocol communications. // This is part of the Raft consensus protocol communication between nodes. Step(ctx context.Context, msg raftpb.Message) error // AddNode adds a new node to the Raft cluster. // This operation can only be performed by the leader. AddNode(ctx context.Context, address string) error // RemoveNode removes a node from the Raft cluster. // This operation can only be performed by the leader. RemoveNode(ctx context.Context, memberID uint64) error // AddLearner adds a new non-voting learner node to the Raft cluster. // Learner nodes receive log entries but don't participate in elections. // This is typically used to bring new nodes up to speed before promoting them to voters. // This operation can only be performed by the leader. AddLearner(ctx context.Context, address string) error } var ( // ErrObsoleted is returned when an event associated with a LSN is shadowed by another one with higher term. That event // must be unlocked and removed from the registry. ErrObsoleted = fmt.Errorf("event is obsolete, superseded by a recent log entry with higher term") // ErrReadyTimeout is returned when the replica times out when waiting for Raft group to be ready. ErrReadyTimeout = fmt.Errorf("ready timeout exceeded") // ErrReplicaStopped is returned when the main loop of Raft replica stops. ErrReplicaStopped = fmt.Errorf("raft replica stopped") ) const ( // Maximum size of individual Raft messages defaultMaxSizePerMsg = 10 * 1024 * 1024 // Maximum number of in-flight Raft messages. This controls how many messages can be sent without acknowledgment defaultMaxInflightMsgs = 256 ) // ready manages the readiness signaling of the Raft system. type ready struct { c chan error // Channel used to signal readiness once sync.Once // Ensures the readiness signal is sent exactly once } // set signals readiness by closing the channel exactly once. func (r *ready) set(err error) { r.once.Do(func() { r.c <- err close(r.c) }) } // ReplicaOptions configures optional parameters for the Raft Replica. type ReplicaOptions struct { // readyTimeout sets the maximum duration to wait for Raft to become ready readyTimeout time.Duration // opTimeout sets the maximum duration for propose, append, and commit operations // This is primarily used in testing to detect deadlocks and performance issues opTimeout time.Duration // entryRecorder stores Raft log entries for testing purposes entryRecorder *ReplicaEntryRecorder } // OptionFunc defines a function type for configuring ReplicaOptions. type OptionFunc func(opt ReplicaOptions) ReplicaOptions // WithReadyTimeout sets the maximum duration to wait for Raft to become ready. // The default timeout is 5 times the election timeout. func WithReadyTimeout(t time.Duration) OptionFunc { return func(opt ReplicaOptions) ReplicaOptions { opt.readyTimeout = t return opt } } // WithOpTimeout sets a timeout for individual Raft operations. // This should only be used in testing environments. func WithOpTimeout(t time.Duration) OptionFunc { return func(opt ReplicaOptions) ReplicaOptions { opt.opTimeout = t return opt } } // WithEntryRecorder enables recording of Raft log entries for testing. func WithEntryRecorder(recorder *ReplicaEntryRecorder) OptionFunc { return func(opt ReplicaOptions) ReplicaOptions { opt.entryRecorder = recorder return opt } } // Replica orchestrates the Raft consensus protocol for a Gitaly partition. // Each partition is managed by a separate Raft consensus group. // The Replica is responsible for state synchronization, persistence, and communication // between members of the group. It handles the lifecycle including bootstrapping, // leader election, log replication, and membership changes. // // Internally, the Replica integrates with etcd/raft to implement the Raft consensus algorithm // and implements the storage.LogManager interface to interact with Gitaly's transaction system. // // A Replica is identified by a Replica ID, which consists of // (Partition ID, Member ID, Replica Storage Name). type Replica struct { mutex sync.Mutex ctx context.Context // Context for controlling replica's lifecycle cancel context.CancelFunc authorityName string // Name of the storage this partition belongs to ptnID storage.PartitionID // Unique identifier for the managed partition node raft.Node // etcd/raft node representation raftCfg config.Raft // etcd/raft configurations options ReplicaOptions // Additional replica configuration logger logging.Logger // Internal logging logStore *ReplicaLogStore // Persistent storage for Raft logs and state registry *ReplicaEventRegistry // Event tracking leadership *ReplicaLeadership // Current leadership information syncer safe.Syncer // Synchronization operations wg sync.WaitGroup // Goroutine lifecycle management ready *ready // Initialization state tracking started bool // Indicates if replica has been started metrics RaftMetrics // Scoped metrics for this replica // Reference to the RaftEnabledStorage that contains this replica raftEnabledStorage *RaftEnabledStorage // notifyQueue signals new changes or errors to clients // Clients must process signals promptly to prevent blocking notifyQueue chan error // EntryRecorder stores Raft log entries for testing EntryRecorder *ReplicaEntryRecorder // hooks is a collection of hooks, used in test environment to intercept critical events in the replica hooks replicaHooks } // applyOptions creates and validates replica options by applying provided option functions // to a default configuration. func applyOptions(raftCfg config.Raft, opts []OptionFunc) (ReplicaOptions, error) { baseRTT := time.Duration(raftCfg.RTTMilliseconds) * time.Millisecond options := ReplicaOptions{ // Default readyTimeout is 5 times the election timeout to allow for initial self-elections readyTimeout: 5 * time.Duration(raftCfg.ElectionTicks) * baseRTT, } for _, opt := range opts { options = opt(options) } if options.readyTimeout == 0 { return options, fmt.Errorf("readyTimeout must not be zero") } else if options.readyTimeout < time.Duration(raftCfg.ElectionTicks)*baseRTT { return options, fmt.Errorf("readyTimeout must not be less than election timeout") } return options, nil } // RaftReplicaFactory defines a function type that creates a new Raft Replica instance. // This factory is used to create and initialize Replica objects for partitions. type RaftReplicaFactory func( storageName string, partitionID storage.PartitionID, logStore *ReplicaLogStore, logger logging.Logger, metrics *Metrics, ) (*Replica, error) // DefaultFactoryWithNode enhances the factory to connect newly created replicas with raft-enabled storage. // This function creates a Replica and registers it with the appropriate RaftEnabledStorage. func DefaultFactoryWithNode(raftCfg config.Raft, raftNode *Node, opts ...OptionFunc) RaftReplicaFactory { return func( storageName string, partitionID storage.PartitionID, logStore *ReplicaLogStore, logger logging.Logger, metrics *Metrics, ) (*Replica, error) { storage, err := raftNode.GetStorage(storageName) if err != nil { return nil, fmt.Errorf("get storage %q: %w", storageName, err) } raftEnabledStorage, ok := storage.(*RaftEnabledStorage) if !ok { return nil, fmt.Errorf("storage %q is not a RaftEnabledStorage", storageName) } replica, err := NewReplica(storageName, partitionID, raftCfg, logStore, raftEnabledStorage, logger, metrics, opts...) if err != nil { return nil, fmt.Errorf("create replica %q: %w", storageName, err) } if err := raftEnabledStorage.RegisterReplica(partitionID, replica); err != nil { return nil, fmt.Errorf("register replica for partition %d in storage %q: %w", partitionID, storageName, err) } return replica, nil } } // NewReplica creates an instance of Replica for a specific partition. // This function initializes the Replica with the provided configuration but does not // start the Raft protocol. The Initialize method must be called separately to start // the Raft protocol operation. func NewReplica( authorityName string, partitionID storage.PartitionID, raftCfg config.Raft, logStore *ReplicaLogStore, raftEnabledStorage *RaftEnabledStorage, logger logging.Logger, metrics *Metrics, opts ...OptionFunc, ) (*Replica, error) { if !raftCfg.Enabled { return nil, fmt.Errorf("raft is not enabled") } options, err := applyOptions(raftCfg, opts) if err != nil { return nil, fmt.Errorf("invalid raft replica option: %w", err) } if raftEnabledStorage == nil { return nil, fmt.Errorf("raft enabled storage is required") } logger = logger.WithFields(logging.Fields{ "component": "raft", "raft.authority": authorityName, "raft.partition": partitionID, }) scopedMetrics := metrics.Scope(authorityName) return &Replica{ authorityName: authorityName, ptnID: partitionID, raftCfg: raftCfg, options: options, logStore: logStore, logger: logger, registry: NewReplicaEventRegistry(scopedMetrics), syncer: safe.NewSyncer(), leadership: NewLeadership(), ready: &ready{c: make(chan error, 1)}, notifyQueue: make(chan error, 1), EntryRecorder: options.entryRecorder, metrics: scopedMetrics, raftEnabledStorage: raftEnabledStorage, hooks: noopHooks(), }, nil } // Initialize starts the Raft replica by: // - Loading or bootstrapping the Raft state // - Initializing the etcd/raft Node // - Starting the processing goroutine // // The appliedLSN parameter indicates the last log sequence number that was fully applied // to the partition's state. This ensures that Raft processing begins from the correct point // in the log history. // // When a partition is bootstrapped for the first time, the Replica initializes the etcd/raft // state machine, elects itself as the initial leader, and persists all Raft metadata to // persistent storage. Its internal Member ID is always 1 at this stage, making it a fully // functional single-node Raft instance. Later, when new members join, they'll receive // unique Member IDs based on the LSN of the config change entry. func (replica *Replica) Initialize(ctx context.Context, appliedLSN storage.LSN) error { replica.mutex.Lock() defer replica.mutex.Unlock() if replica.started { return fmt.Errorf("raft replica for partition %q already started", replica.ptnID) } replica.started = true replica.ctx, replica.cancel = context.WithCancel(ctx) initStatus, err := replica.logStore.initialize(ctx, appliedLSN) if err != nil { return fmt.Errorf("failed to load raft initial state: %w", err) } // etcd/raft uses an integer ID (Member ID) to identify a member of a Raft group. This Member ID is part of // the Replica ID which consists of (Partition ID, Member ID, Replica Storage Name). // // The Member ID system yields several benefits: // - No need to set the member ID statically, avoiding the need for a composite key of the storage // name and member ID. // - No need for a global node registration system, as IDs are generated within the group. // - Works better in scenarios where a member leaves and then re-joins the cluster. Each join event // results in a new unique member ID, preventing ambiguity. // // When a partition is first bootstrapped, we use a fixed member ID of 1 for the initial member. // When new members join a Raft group, the leader issues a Config Change entry containing the metadata // of the storage. The new member's Member ID is assigned the LSN of this log entry, ensuring unambiguous // identification across the group's lifetime. // // https://gitlab.com/gitlab-org/gitaly/-/issues/6304 tracks the work to bootstrap new cluster members. var memberID uint64 = 1 config := &raft.Config{ ID: memberID, ElectionTick: int(replica.raftCfg.ElectionTicks), HeartbeatTick: int(replica.raftCfg.HeartbeatTicks), Storage: replica.logStore, MaxSizePerMsg: defaultMaxSizePerMsg, MaxInflightMsgs: defaultMaxInflightMsgs, Logger: &raftLogger{logger: replica.logger.WithFields(logging.Fields{"raft.component": "replica"})}, // We disable automatic proposal forwarding provided by etcd/raft because it would bypass Gitaly's // transaction validation system. In Gitaly, each transaction is verified against the latest state // before being committed. If proposal forwarding is enabled, replica nodes would have the ability to // start transactions independently and propose them to the leader for commit. // // Replica: A -> B -> C -> Start D ------------> Forward to Leader ----| // Leader: A -> B -> C -> Start E -> Commit E ----------------------> Receive D -> Commit D // In this scenario, D is not verified against E even though E commits before D. // // Instead, we'll implement explicit request routing at the RPC layer to ensure all writes go through // proper verification on the leader. // See https://gitlab.com/gitlab-org/gitaly/-/issues/6465 DisableProposalForwarding: true, } switch initStatus { case InitStatusUnbootstrapped: // For first-time bootstrap, initialize with self as the only peer peers := []raft.Peer{{ID: memberID}} replica.node = raft.StartNode(config, peers) case InitStatusBootstrapped: // For restarts, set Applied to latest committed LSN // WAL considers entries committed once they are in the Raft log config.Applied = uint64(replica.logStore.readCommittedLSN()) replica.node = raft.RestartNode(config) case InitStatusNeedsBackfill: // For migrations from non-Raft to Raft storage, we need to establish initial Raft state through these // steps: Create a configuration with the node itself as the only voter and set the commit point to // include all existing backfilled entries, ensuring they're considered committed by the Raft protocol. if err := replica.logStore.saveConfState(raftpb.ConfState{ Voters: []uint64{memberID}, }); err != nil { return fmt.Errorf("saving conf state: %w", err) } if err := replica.logStore.saveHardState(raftpb.HardState{ Vote: memberID, Commit: uint64(replica.logStore.readCommittedLSN()), }); err != nil { return fmt.Errorf("saving hard state: %w", err) } config.Applied = uint64(replica.logStore.readCommittedLSN()) replica.node = raft.RestartNode(config) default: return fmt.Errorf("raft bootstrapping returns unknown status without any error") } go replica.run(initStatus) select { case <-time.After(replica.options.readyTimeout): return ErrReadyTimeout case err := <-replica.ready.c: return err } } // run executes the main Raft event loop, processing ticks, ready states, and notifications. func (replica *Replica) run(initStatus InitStatus) { replica.wg.Add(1) defer replica.wg.Done() ticker := time.NewTicker(time.Duration(replica.raftCfg.RTTMilliseconds) * time.Millisecond) defer ticker.Stop() // For bootstrapped clusters, mark ready immediately since state is already established // For new clusters, wait for first config change if initStatus != InitStatusUnbootstrapped { replica.signalReady() } // Main event processing loop for { select { case <-ticker.C: // Drive the etcd/raft internal clock // Election and timeout depend on tick count replica.node.Tick() case rd, ok := <-replica.node.Ready(): if err := replica.safeExec(func() error { if !ok { return fmt.Errorf("raft node Ready channel unexpectedly closed") } if err := replica.handleReady(&rd); err != nil { return err } replica.hooks.BeforeAdvance() replica.node.Advance() return nil }); err != nil { replica.handleFatalError(err) return } case err := <-replica.logStore.localLog.GetNotificationQueue(): // Forward storage notifications if err == nil { select { case replica.notifyQueue <- nil: default: // Non-critical if we can't send a nil notification } } else { replica.handleFatalError(err) return } case <-replica.ctx.Done(): err := replica.ctx.Err() if !errors.Is(err, context.Canceled) { replica.handleFatalError(err) } return } } } // safeExec executes a function and recovers from panics, converting them to errors func (replica *Replica) safeExec(fn func() error) (err error) { defer func() { if r := recover(); r != nil { switch v := r.(type) { case error: err = fmt.Errorf("panic recovered: %w", v) default: err = fmt.Errorf("panic recovered: %v", r) } // Capture stack trace for debugging stack := make([]byte, 4096) stack = stack[:runtime.Stack(stack, false)] err := fmt.Errorf("raft replica panic: %v", r) replica.logger.WithError(err).WithField("error.stack", string(stack)).Error("raft replica panic recovered") } }() return fn() } // handleFatalError handles a fatal error that requires the run loop to terminate func (replica *Replica) handleFatalError(err error) { // Set back to ready to unlock the caller of Initialize(). replica.signalError(ErrReplicaStopped) // Unlock all waiters of AppendLogEntry about the replica being stopped. replica.registry.UntrackAll(ErrReplicaStopped) replica.metrics.eventLoopCrashes.Inc() replica.logger.WithError(err).Error("raft event loop failed") // Ensure error is sent to notification queue. replica.notifyQueue <- err } // Close gracefully shuts down the Raft replica and its components. func (replica *Replica) Close() error { replica.mutex.Lock() defer replica.mutex.Unlock() if !replica.started { return nil } replica.node.Stop() replica.cancel() replica.wg.Wait() if replica.raftEnabledStorage != nil { // Mostly for tests; raftEnabledStorage should never be nil in practice. replica.raftEnabledStorage.DeregisterReplica(replica) } return replica.logStore.close() } // GetNotificationQueue returns the channel used to notify external components of changes. func (replica *Replica) GetNotificationQueue() <-chan error { return replica.notifyQueue } // GetEntryPath returns the filesystem path for a given log entry. func (replica *Replica) GetEntryPath(lsn storage.LSN) string { return replica.logStore.localLog.GetEntryPath(lsn) } // AcknowledgePosition marks log entries up to and including the given LSN // as successfully processed for the specified position type. Raft replica // doesn't handle this directly. It propagates to the local log manager. func (replica *Replica) AcknowledgePosition(t storage.PositionType, lsn storage.LSN) error { return replica.logStore.localLog.AcknowledgePosition(t, lsn) } // AppendedLSN returns the LSN of the most recently appended log entry. func (replica *Replica) AppendedLSN() storage.LSN { return replica.logStore.readCommittedLSN() } // LowWaterMark returns the earliest LSN that should be retained. // Log entries before this LSN can be safely removed. func (replica *Replica) LowWaterMark() storage.LSN { lsn, _ := replica.logStore.FirstIndex() return storage.LSN(lsn) } // AppendLogEntry proposes a new log entry to the Raft group. // It blocks until the entry is committed, timeout occurs, or the cluster rejects it. // // This function is part of the storage.LogManager interface implementation and serves // as the integration point between Gitaly's transaction system and the Raft consensus protocol. // When a transaction is committed, its log entry is proposed through this method. // // Each partition maintains its own independent log with monotonic LSNs. // All repositories within a partition share the same log. func (replica *Replica) AppendLogEntry(logEntryPath string) (_ storage.LSN, returnedErr error) { replica.wg.Add(1) defer replica.wg.Done() // Start timing proposal duration proposalTimer := prometheus.NewTimer(replica.metrics.proposalDurationSec) defer func() { proposalTimer.ObserveDuration() result := "success" if returnedErr != nil { result = "error" } replica.metrics.proposalsTotal.WithLabelValues(result).Inc() }() w := replica.registry.Register() defer replica.registry.Untrack(w.ID) message := &gitalypb.RaftEntry{ Id: uint64(w.ID), Data: &gitalypb.RaftEntry_LogData{ LocalPath: []byte(logEntryPath), }, } data, err := proto.Marshal(message) if err != nil { return 0, fmt.Errorf("marshaling Raft message: %w", err) } ctx := replica.ctx // Set an optional timeout to prevent proposal processing takes forever. This option is // more useful in testing environments. if replica.options.opTimeout != 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(replica.ctx, replica.options.opTimeout) defer cancel() } replica.hooks.BeforePropose(logEntryPath) if err := replica.node.Propose(ctx, data); err != nil { return 0, fmt.Errorf("proposing Raft message: %w", err) } select { case <-ctx.Done(): return 0, ctx.Err() case err := <-w.C: return w.LSN, err } } // CompareAndAppendLogEntry is a variant of AppendLogEntry. It appends the log entry to the write-ahead log if and only // if the inserting position matches the expected LSN. Raft replica doesn't implement this method. The LSN is allocated // by the underlying Raft engine. It cannot guarantee the inserted LSN matches before actual insertion. func (replica *Replica) CompareAndAppendLogEntry(lsn storage.LSN, logEntryPath string) (storage.LSN, error) { return 0, fmt.Errorf("raft replica does not support CompareAndAppendLogEntry") } // DeleteLogEntry deletes the log entry at the given LSN from the log. Raft replica doesn't support log entry deletion. // After an entry is persisted, it is then sent to other members in the raft group for acknowledgement. There is no good // way to withdraw this submission. func (replica *Replica) DeleteLogEntry(lsn storage.LSN) error { return fmt.Errorf("raft replica does not support DeleteLogEntry") } // NotifyNewEntries signals to the notification queue that a newly written log entry is available for consumption. func (replica *Replica) NotifyNewEntries() { replica.notifyQueue <- nil } // handleReady processes the next state signaled by etcd/raft through three main steps: // 1. Persist states (SoftState, HardState, and uncommitted Entries) // 2. Send messages to other members via Transport // 3. Process committed entries (entries acknowledged by the majority) // // This is the core of the Raft consensus protocol implementation. Each Replica // independently processes its own Ready states, allowing many Raft groups // to operate simultaneously on a single Gitaly server. // // In the current single-node implementation, entries are committed immediately without network // communication. In a multi-node setup, entries will be replicated to other members and will // only be committed once acknowledged by a majority. // // See: https://pkg.go.dev/go.etcd.io/raft/v3#section-readme func (replica *Replica) handleReady(rd *raft.Ready) error { replica.hooks.BeforeHandleReady() // Handle volatile state updates for leadership tracking and observability if err := replica.handleSoftState(rd); err != nil { return fmt.Errorf("handling soft state: %w", err) } // In https://pkg.go.dev/go.etcd.io/raft/v3#section-readme, the // library recommends saving entries, hard state, and snapshots // atomically. We can't currently do this because we use different // backends to persist these items: // - entries are appended to WAL // - hard state is persisted to the KV DB // - snapshots are written to a directory // // Each partition has its own WAL that operates independently. // All repositories within a partition share the same monotonic log sequence number (LSN). // Persist new log entries to disk. These entries are not yet committed // and may be superseded by entries with the same LSN but higher term. // WAL will clean up any overlapping entries. if err := replica.saveEntries(rd); err != nil { return fmt.Errorf("saving entries: %w", err) } // Persist essential state needed for crash recovery if err := replica.handleHardState(rd); err != nil { return fmt.Errorf("handling hard state: %w", err) } // Send messages to other members in the cluster. // Note: While the Raft thesis (section 10.2) suggests pipelining this step // for parallel processing after disk persistence, our WAL currently serializes // transactions. This optimization may become relevant when WAL supports // concurrent transaction processing. // Reference: https://github.com/ongardie/dissertation/blob/master/stanford.pdf // // The current implementation does not include Raft snapshotting or log compaction. // This means the log will grow indefinitely until manually truncated. // // In a future implementation, periodic snapshots will allow the log to be trimmed // by removing entries that have been incorporated into a snapshot. // See https://gitlab.com/gitlab-org/gitaly/-/issues/6463 if err := replica.sendMessages(rd); err != nil { return fmt.Errorf("sending messages: %w", err) } // Process committed entries in WAL. In single-node clusters, entries will be // committed immediately without network communication since there's no need for // consensus with other members. if err := replica.processCommitEntries(rd); err != nil { return fmt.Errorf("processing committed entries: %w", err) } return nil } // saveEntries persists new log entries to storage and handles their recording if enabled. func (replica *Replica) saveEntries(rd *raft.Ready) error { if len(rd.Entries) == 0 { return nil } // Remove in-flight events with duplicate LSNs but lower terms // WAL will clean up corresponding entries on disk // Events without LSNs are preserved as they haven't reached this stage firstLSN := storage.LSN(rd.Entries[0].Index) replica.registry.UntrackSince(firstLSN, ErrObsoleted) for i := range rd.Entries { lsn := storage.LSN(rd.Entries[i].Index) switch rd.Entries[i].Type { case raftpb.EntryNormal: if len(rd.Entries[i].Data) == 0 { // Handle empty entries (typically internal Raft entries) if err := replica.logStore.draftLogEntry(rd.Entries[i], func(w *wal.Entry) error { return nil }); err != nil { return fmt.Errorf("inserting config change log entry: %w", err) } if err := replica.recordEntryIfNeeded(true, lsn); err != nil { return fmt.Errorf("recording log entry: %w", err) } if replica.metrics.logEntriesProcessed != nil { replica.metrics.logEntriesProcessed.WithLabelValues("append", "verify").Inc() } } else { // Handle normal entries containing RaftMessage data var message gitalypb.RaftEntry if err := proto.Unmarshal(rd.Entries[i].Data, &message); err != nil { return fmt.Errorf("unmarshalling entry type: %w", err) } logEntryPath := string(message.GetData().GetLocalPath()) if err := replica.logStore.insertLogEntry(rd.Entries[i], logEntryPath); err != nil { return fmt.Errorf("appending log entry: %w", err) } if err := replica.recordEntryIfNeeded(false, lsn); err != nil { return fmt.Errorf("recording log entry: %w", err) } replica.registry.AssignLSN(EventID(message.GetId()), lsn) if replica.metrics.logEntriesProcessed != nil { replica.metrics.logEntriesProcessed.WithLabelValues("append", "application").Inc() } } case raftpb.EntryConfChange, raftpb.EntryConfChangeV2: // Handle configuration change entries if err := replica.logStore.draftLogEntry(rd.Entries[i], func(w *wal.Entry) error { marshaledValue, err := proto.Marshal(lsn.ToProto()) if err != nil { return fmt.Errorf("marshal value: %w", err) } w.SetKey(KeyLastConfigChange, marshaledValue) return nil }); err != nil { return fmt.Errorf("inserting config change log entry: %w", err) } if err := replica.recordEntryIfNeeded(true, lsn); err != nil { return fmt.Errorf("recording log entry: %w", err) } if replica.metrics.logEntriesProcessed != nil { replica.metrics.logEntriesProcessed.WithLabelValues("append", "config_change").Inc() } default: return fmt.Errorf("raft entry type not supported: %s", rd.Entries[i].Type) } } return nil } // processCommitEntries processes entries that have been committed by the Raft consensus // and updates the system state accordingly. func (replica *Replica) processCommitEntries(rd *raft.Ready) error { replica.hooks.BeforeProcessCommittedEntries(rd.CommittedEntries) for i := range rd.CommittedEntries { var shouldNotify bool switch rd.CommittedEntries[i].Type { case raftpb.EntryNormal: var message gitalypb.RaftEntry if err := proto.Unmarshal(rd.CommittedEntries[i].Data, &message); err != nil { return fmt.Errorf("unmarshalling entry type: %w", err) } // Notification logic: // 1. For internal entries (those NOT tracked in the registry), we notify because // the caller isn't aware of these automatically generated entries // 2. For caller-issued entries (those tracked in the registry), we don't notify // since the caller already knows about these entries // The Untrack() method returns true for tracked entries and the unlocks waiting channel in // AppendLogEntry(). Callers must handle concurrent modifications appropriately shouldNotify = !replica.registry.Untrack(EventID(message.GetId())) if replica.metrics.logEntriesProcessed != nil { if len(rd.CommittedEntries[i].Data) == 0 { replica.metrics.logEntriesProcessed.WithLabelValues("commit", "verify").Inc() } else { replica.metrics.logEntriesProcessed.WithLabelValues("commit", "application").Inc() } } case raftpb.EntryConfChange, raftpb.EntryConfChangeV2: if err := replica.processConfChange(rd.CommittedEntries[i]); err != nil { return fmt.Errorf("processing config change: %w", err) } shouldNotify = true if replica.metrics.logEntriesProcessed != nil { replica.metrics.logEntriesProcessed.WithLabelValues("commit", "config_change").Inc() } default: return fmt.Errorf("raft entry type not supported: %s", rd.CommittedEntries[i].Type) } if shouldNotify { select { case replica.notifyQueue <- nil: default: } } } return nil } // processConfChange processes committed configuration change entries. // This function handles membership changes in the Raft group and updates the routing table. func (replica *Replica) processConfChange(entry raftpb.Entry) error { replicaChanges, err := ParseConfChange(entry, replica.leadership.GetLeaderID()) if err != nil { return fmt.Errorf("parsing conf changes: %w", err) } cc, err := replicaChanges.ToConfChangeV2() if err != nil { return fmt.Errorf("converting replica changes to etcd/raft config changes: %w", err) } // Apply the configuration change to the Raft node confState := replica.node.ApplyConfChange(cc) if err := replica.logStore.saveConfState(*confState); err != nil { return fmt.Errorf("saving config state: %w", err) } partitionKey := &gitalypb.PartitionKey{ AuthorityName: replica.authorityName, PartitionId: uint64(replica.ptnID), } routingTable := replica.raftEnabledStorage.GetRoutingTable() if routingTable == nil { return fmt.Errorf("routing table not found") } // Apply the changes to the routing table if err := routingTable.ApplyReplicaConfChange(partitionKey, replicaChanges); err != nil { return fmt.Errorf("applying conf changes: %w", err) } // Signal readiness after first config change. Applies only to new clusters that have not been bootstrapped. Not // needed for subsequent restarts replica.signalReady() return nil } // sendMessages delivers pending Raft messages to other members via the transport layer. // This function is responsible for sending Raft protocol messages between members. func (replica *Replica) sendMessages(rd *raft.Ready) error { replica.hooks.BeforeSendMessages() if len(rd.Messages) > 0 { // This code path will be properly implemented when network communication is added. // When implemented, this will use gRPC to transfer messages through a single RPC, // `RaftService.SendMessage`, which enhances Raft messages with partition identity metadata. // // To mitigate the "chatty" nature of the Raft protocol, Gitaly will implement // techniques such as batching health checks and quiescing inactive groups. // // See https://gitlab.com/gitlab-org/gitaly/-/issues/6304 replica.logger.Error("networking for raft cluster is not implemented yet") } return nil } // handleSoftState processes changes to volatile state like leadership and logs significant changes. // // Leadership changes are frequent but not broadcasted to the routing table due to // potential high frequency. Instead, only replica set changes are updated in the routing table. func (replica *Replica) handleSoftState(rd *raft.Ready) error { state := rd.SoftState if state == nil { return nil } prevLeader := replica.leadership.GetLeaderID() changed, duration := replica.leadership.SetLeader(state.Lead, state.RaftState == raft.StateLeader) if changed { replica.logger.WithFields(logging.Fields{ "raft.leader_id": replica.leadership.GetLeaderID(), "raft.is_leader": replica.leadership.IsLeader(), "raft.previous_leader_id": prevLeader, "raft.leadership_duration": duration, }).Info("leadership updated") } return nil } // handleHardState persists critical Raft state required for crash recovery. func (replica *Replica) handleHardState(rd *raft.Ready) error { if raft.IsEmptyHardState(rd.HardState) { return nil } if err := replica.logStore.saveHardState(rd.HardState); err != nil { return fmt.Errorf("saving hard state: %w", err) } return nil } // recordEntryIfNeeded records log entries when entry recording is enabled, // typically used for testing and debugging. func (replica *Replica) recordEntryIfNeeded(fromRaft bool, lsn storage.LSN) error { if replica.EntryRecorder != nil { logEntry, err := replica.logStore.readLogEntry(lsn) if err != nil { return fmt.Errorf("reading log entry: %w", err) } replica.EntryRecorder.Record(fromRaft, lsn, logEntry) } return nil } func (replica *Replica) signalReady() { replica.ready.set(nil) } func (replica *Replica) signalError(err error) { replica.ready.set(err) } // Step processes a Raft message from a remote node func (replica *Replica) Step(ctx context.Context, msg raftpb.Message) error { if !replica.started { return fmt.Errorf("raft replica not started") } return replica.node.Step(ctx, msg) } // AddNode implements RaftReplica.AddNode func (replica *Replica) AddNode(ctx context.Context, address string) error { memberID := uint64(replica.AppendedLSN() + 1) return replica.proposeMembershipChange(ctx, string(addVoter), memberID, ConfChangeAddNode, &gitalypb.ReplicaID_Metadata{ Address: address, }) } // RemoveNode implements RaftReplica.RemoveNode func (replica *Replica) RemoveNode(ctx context.Context, memberID uint64) error { return replica.proposeMembershipChange(ctx, string(removeNode), memberID, ConfChangeRemoveNode, nil) } // AddLearner implements RaftReplica.AddLearner func (replica *Replica) AddLearner(ctx context.Context, address string) error { memberID := uint64(replica.AppendedLSN() + 1) return replica.proposeMembershipChange(ctx, string(addLearner), memberID, ConfChangeAddLearnerNode, &gitalypb.ReplicaID_Metadata{ Address: address, }) } // proposeMembershipChange is a helper function that handles the common pattern for membership changes. // It checks leadership and proposes the configuration change. func (replica *Replica) proposeMembershipChange( ctx context.Context, changeType string, memberID uint64, confChangeType ConfChangeType, metadata *gitalypb.ReplicaID_Metadata, ) error { if !replica.leadership.IsLeader() { replica.metrics.IncMembershipError(changeType, "not_leader") return fmt.Errorf("replica is not the leader") } if confChangeType == ConfChangeRemoveNode { routingTable := replica.raftEnabledStorage.GetRoutingTable() if routingTable == nil { return fmt.Errorf("routing table not found") } if err := checkMemberID(replica, memberID, routingTable); err != nil { return fmt.Errorf("checking member ID: %w", err) } } changes := NewReplicaConfChanges( replica.node.Status().Term, uint64(replica.AppendedLSN()), replica.leadership.GetLeaderID(), metadata, ) changes.AddChange(memberID, confChangeType) cc, err := changes.ToConfChangeV2() if err != nil { return fmt.Errorf("convert to conf change v2: %w", err) } if err := replica.node.ProposeConfChange(ctx, cc); err != nil { replica.metrics.IncMembershipError(changeType, "propose_failed") return fmt.Errorf("propose conf change: %w", err) } replica.metrics.IncMembershipChange(changeType) return nil } func checkMemberID(replica *Replica, memberID uint64, routingTable RoutingTable) error { partitionKey := &gitalypb.PartitionKey{ AuthorityName: replica.authorityName, PartitionId: uint64(replica.ptnID), } _, err := routingTable.Translate(partitionKey, memberID) if err != nil { return fmt.Errorf("translating member ID: %w", err) } return nil } var _ = (storage.LogManager)(&Replica{}) // Ensure Replica implements LogManager interface