internal/gitaly/storage/raftmgr/routing_table.go (184 lines of code) (raw):

package raftmgr import ( "encoding/json" "errors" "fmt" "slices" "sync" "github.com/dgraph-io/badger/v4" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue" "gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr" "gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb" ) func routingKey(partitionKey *gitalypb.PartitionKey) []byte { return []byte(fmt.Sprintf("/raft/%s/%d", partitionKey.GetAuthorityName(), partitionKey.GetPartitionId())) } // RoutingTableEntry represents a Raft cluster's routing state for a partition. // It includes the current leader, all replicas, and Raft consensus metadata. type RoutingTableEntry struct { RelativePath string // For backward compatibility Replicas []*gitalypb.ReplicaID LeaderID uint64 Term uint64 Index uint64 } // ReplicaMetadata contains additional information about a replica // that is needed for routing messages. type ReplicaMetadata struct { Address string } // RoutingTable handles translation between member IDs and addresses type RoutingTable interface { Translate(partitionKey *gitalypb.PartitionKey, memberID uint64) (*gitalypb.ReplicaID, error) GetEntry(partitionKey *gitalypb.PartitionKey) (*RoutingTableEntry, error) UpsertEntry(entry RoutingTableEntry) error ApplyReplicaConfChange(partitionKey *gitalypb.PartitionKey, changes *ReplicaConfChanges) error } // PersistentRoutingTable implements the RoutingTable interface with KV storage type kvRoutingTable struct { kvStore keyvalue.Transactioner mutex sync.RWMutex } // NewKVRoutingTable creates a new key-value based routing table implementation // that persists routing information using badgerDB. func NewKVRoutingTable(kvStore keyvalue.Store) *kvRoutingTable { prefix := []byte(fmt.Sprintf("p/%d", storagemgr.MetadataPartitionID)) prefixedStore := keyvalue.NewPrefixedTransactioner(kvStore, prefix) return &kvRoutingTable{ kvStore: prefixedStore, } } // UpsertEntry updates or creates a routing table entry func (r *kvRoutingTable) UpsertEntry(entry RoutingTableEntry) error { r.mutex.Lock() defer r.mutex.Unlock() return r.kvStore.Update(func(txn keyvalue.ReadWriter) error { if len(entry.Replicas) == 0 { return fmt.Errorf("no replicas to upsert") } partitionKey := entry.Replicas[0].GetPartitionKey() key := routingKey(partitionKey) item, err := txn.Get(key) if err != nil && !errors.Is(err, badger.ErrKeyNotFound) { return fmt.Errorf("get existing entry: %w", err) } var existing *RoutingTableEntry if item != nil { existing = &RoutingTableEntry{} if err := item.Value(func(val []byte) error { return json.Unmarshal(val, existing) }); err != nil { return fmt.Errorf("unmarshal existing entry: %w", err) } } // Only update if new entry has higher term or index if existing != nil { if entry.Term < existing.Term || (entry.Term == existing.Term && entry.Index <= existing.Index) { return fmt.Errorf("stale entry: current term=%d,index=%d, new term=%d,index=%d", existing.Term, existing.Index, entry.Term, entry.Index) } } data, err := json.Marshal(entry) if err != nil { return fmt.Errorf("marshal entry: %w", err) } if err := txn.Set(key, data); err != nil { return fmt.Errorf("set entry: %w", err) } return nil }) } // GetEntry retrieves a routing table entry func (r *kvRoutingTable) GetEntry(partitionKey *gitalypb.PartitionKey) (*RoutingTableEntry, error) { key := routingKey(partitionKey) var entry RoutingTableEntry if err := r.kvStore.View(func(txn keyvalue.ReadWriter) error { item, err := txn.Get(key) if err != nil { return err } return item.Value(func(value []byte) error { return json.Unmarshal(value, &entry) }) }); err != nil { return nil, fmt.Errorf("view: %w", err) } return &entry, nil } // Translate returns the storage name and address for a given partition key and member ID func (r *kvRoutingTable) Translate(partitionKey *gitalypb.PartitionKey, memberID uint64) (*gitalypb.ReplicaID, error) { entry, err := r.GetEntry(partitionKey) if err != nil { return nil, fmt.Errorf("get entry: %w", err) } for _, replica := range entry.Replicas { if replica.GetMemberId() == memberID { return replica, nil } } return nil, fmt.Errorf("no address found for memberID %d", memberID) } func (r *kvRoutingTable) ApplyReplicaConfChange(partitionKey *gitalypb.PartitionKey, changes *ReplicaConfChanges) error { routingTableEntry, err := r.GetEntry(partitionKey) if err != nil && !errors.Is(err, badger.ErrKeyNotFound) { return fmt.Errorf("getting routing table entry: %w", err) } if routingTableEntry == nil { routingTableEntry = &RoutingTableEntry{ Replicas: []*gitalypb.ReplicaID{}, } } routingTableEntry.LeaderID = changes.LeaderID() routingTableEntry.Term = changes.Term() routingTableEntry.Index = changes.Index() authorityName := partitionKey.GetAuthorityName() metadata := changes.Metadata() for _, confChange := range changes.Changes() { switch confChange.changeType { case ConfChangeAddNode: if confChange.memberID == 0 { return fmt.Errorf("member ID should be non-zero") } if slices.ContainsFunc(routingTableEntry.Replicas, func(r *gitalypb.ReplicaID) bool { return r.GetMemberId() == confChange.memberID }) { return fmt.Errorf("member ID %d already exists", confChange.memberID) } replica := &gitalypb.ReplicaID{ PartitionKey: partitionKey, MemberId: confChange.memberID, StorageName: authorityName, Metadata: metadata, Type: gitalypb.ReplicaID_REPLICA_TYPE_VOTER, } routingTableEntry.Replicas = append(routingTableEntry.Replicas, replica) case ConfChangeAddLearnerNode: if confChange.memberID == 0 { return fmt.Errorf("member ID should be non-zero") } if slices.ContainsFunc(routingTableEntry.Replicas, func(r *gitalypb.ReplicaID) bool { return r.GetMemberId() == confChange.memberID }) { return fmt.Errorf("member ID %d already exists as a replica", confChange.memberID) } learner := &gitalypb.ReplicaID{ PartitionKey: partitionKey, MemberId: confChange.memberID, StorageName: authorityName, Metadata: metadata, Type: gitalypb.ReplicaID_REPLICA_TYPE_LEARNER, } routingTableEntry.Replicas = append(routingTableEntry.Replicas, learner) case ConfChangeRemoveNode: if len(routingTableEntry.Replicas) == 0 { return fmt.Errorf("no replicas to remove") } routingTableEntry.Replicas = slices.DeleteFunc(routingTableEntry.Replicas, func(r *gitalypb.ReplicaID) bool { return r.GetMemberId() == confChange.memberID }) case ConfChangeUpdateNode: index := slices.IndexFunc(routingTableEntry.Replicas, func(r *gitalypb.ReplicaID) bool { return r.GetMemberId() == confChange.memberID }) if index == -1 { return fmt.Errorf("member ID %d not found for update", confChange.memberID) } routingTableEntry.Replicas[index].Metadata = metadata default: return fmt.Errorf("unknown conf change type: %d", confChange.changeType) } } // Update routing table with new entry if err := r.UpsertEntry(*routingTableEntry); err != nil { return fmt.Errorf("updating routing table: %w", err) } return nil }