internal/gitaly/storage/raftmgr/replica_conf_change.go (162 lines of code) (raw):
package raftmgr
import (
"fmt"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"go.etcd.io/raft/v3/raftpb"
"google.golang.org/protobuf/proto"
)
// ConfChangeType represents the type of configuration change.
type ConfChangeType int
// Constants representing different configuration change types.
const (
ConfChangeAddNode ConfChangeType = iota
ConfChangeRemoveNode
ConfChangeUpdateNode
ConfChangeAddLearnerNode // Adds a node as a learner (non-voting member)
)
// ReplicaConfChange represents a single configuration change.
type ReplicaConfChange struct {
memberID uint64
changeType ConfChangeType
}
// ReplicaConfChanges is a wrapper around raftpb.ConfChangeI that provides
// a consistent interface for configuration changes regardless of the underlying
// implementation (ConfChange or ConfChangeV2).
type ReplicaConfChanges struct {
changes []ReplicaConfChange
metadata *gitalypb.ReplicaID_Metadata
term uint64
index uint64
leaderID uint64
}
// NewReplicaConfChanges creates a new ReplicaConfChanges instance.
func NewReplicaConfChanges(
term uint64,
index uint64,
leaderID uint64,
metadata *gitalypb.ReplicaID_Metadata,
) *ReplicaConfChanges {
return &ReplicaConfChanges{
changes: make([]ReplicaConfChange, 0),
metadata: metadata,
term: term,
index: index,
leaderID: leaderID,
}
}
// AddChange adds a configuration change to the changes list.
func (r *ReplicaConfChanges) AddChange(memberID uint64, nodeType ConfChangeType) {
r.changes = append(r.changes, ReplicaConfChange{
memberID: memberID,
changeType: nodeType,
})
}
// Changes returns the list of changes.
func (r *ReplicaConfChanges) Changes() []ReplicaConfChange {
return r.changes
}
// Metadata returns the metadata associated with the configuration changes.
func (r *ReplicaConfChanges) Metadata() *gitalypb.ReplicaID_Metadata {
return r.metadata
}
// Term returns the term of the configuration changes.
func (r *ReplicaConfChanges) Term() uint64 {
return r.term
}
// Index returns the index of the configuration changes.
func (r *ReplicaConfChanges) Index() uint64 {
return r.index
}
// LeaderID returns the leader ID associated with the configuration changes.
func (r *ReplicaConfChanges) LeaderID() uint64 {
return r.leaderID
}
// ToConfChangeV2 converts ReplicaConfChanges to a raftpb.ConfChangeV2.
func (r *ReplicaConfChanges) ToConfChangeV2() (raftpb.ConfChangeV2, error) {
if len(r.changes) == 0 {
return raftpb.ConfChangeV2{}, fmt.Errorf("no changes available to convert to ConfChangeV2")
}
changes := make([]raftpb.ConfChangeSingle, 0, len(r.changes))
for _, change := range r.changes {
var confType raftpb.ConfChangeType
switch change.changeType {
case ConfChangeAddNode:
confType = raftpb.ConfChangeAddNode
case ConfChangeRemoveNode:
confType = raftpb.ConfChangeRemoveNode
case ConfChangeUpdateNode:
confType = raftpb.ConfChangeUpdateNode
case ConfChangeAddLearnerNode:
confType = raftpb.ConfChangeAddLearnerNode
default:
return raftpb.ConfChangeV2{}, fmt.Errorf("unknown conf change type: %d", change.changeType)
}
changes = append(changes, raftpb.ConfChangeSingle{
Type: confType,
NodeID: change.memberID,
})
}
var context []byte
var err error
if r.metadata != nil {
context, err = proto.Marshal(r.metadata)
if err != nil {
return raftpb.ConfChangeV2{}, fmt.Errorf("marshal metadata: %w", err)
}
}
return raftpb.ConfChangeV2{
Context: context,
Changes: changes,
}, nil
}
// parseChangeType converts a raftpb.ConfChangeType to a ConfChangeType
func parseChangeType(ccType raftpb.ConfChangeType) (ConfChangeType, error) {
switch ccType {
case raftpb.ConfChangeAddNode:
return ConfChangeAddNode, nil
case raftpb.ConfChangeRemoveNode:
return ConfChangeRemoveNode, nil
case raftpb.ConfChangeUpdateNode:
return ConfChangeUpdateNode, nil
case raftpb.ConfChangeAddLearnerNode:
return ConfChangeAddLearnerNode, nil
default:
return ConfChangeType(0), fmt.Errorf("unknown conf change type: %d", ccType)
}
}
// parseMetadata extracts metadata from the context byte slice
func parseMetadata(context []byte) (*gitalypb.ReplicaID_Metadata, error) {
if len(context) == 0 {
return nil, nil
}
metadata := &gitalypb.ReplicaID_Metadata{}
if err := proto.Unmarshal(context, metadata); err != nil {
return nil, fmt.Errorf("unmarshal metadata: %w", err)
}
return metadata, nil
}
// The Convert function has been merged into ParseConfChange to reduce steps
// ParseConfChange parses a raftpb.Entry containing a configuration change directly into a ReplicaConfChanges.
// This handles unmarshalling for both EntryConfChange and EntryConfChangeV2 types and converts them
// directly to our unified format.
func ParseConfChange(entry raftpb.Entry, leaderID uint64) (*ReplicaConfChanges, error) {
if entry.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
if err := cc.Unmarshal(entry.Data); err != nil {
return nil, fmt.Errorf("unmarshalling EntryConfChange: %w", err)
}
metadata, err := parseMetadata(cc.Context)
if err != nil {
return nil, err
}
nodeType, err := parseChangeType(cc.Type)
if err != nil {
return nil, err
}
result := NewReplicaConfChanges(entry.Term, entry.Index, leaderID, metadata)
result.AddChange(cc.NodeID, nodeType)
return result, nil
} else if entry.Type == raftpb.EntryConfChangeV2 {
var cc raftpb.ConfChangeV2
if err := cc.Unmarshal(entry.Data); err != nil {
return nil, fmt.Errorf("unmarshalling EntryConfChangeV2: %w", err)
}
if len(cc.Changes) == 0 {
return nil, fmt.Errorf("no changes in ConfChangeV2")
}
metadata, err := parseMetadata(cc.Context)
if err != nil {
return nil, err
}
result := NewReplicaConfChanges(entry.Term, entry.Index, leaderID, metadata)
for _, change := range cc.Changes {
nodeType, err := parseChangeType(change.Type)
if err != nil {
return nil, err
}
result.AddChange(change.NodeID, nodeType)
}
return result, nil
}
return nil, fmt.Errorf("entry is not a configuration change: %s", entry.Type)
}