internal/gitaly/storage/raftmgr/raft_enabled_storage.go (83 lines of code) (raw):
package raftmgr
import (
"fmt"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/keyvalue/databasemgr"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
// RaftEnabledStorage wraps a storage.Storage instance with Raft functionality
type RaftEnabledStorage struct {
storage.Storage
transport Transport
routingTable RoutingTable
replicaRegistry ReplicaRegistry
}
// GetTransport returns the Raft transport for this storage
func (s *RaftEnabledStorage) GetTransport() Transport {
return s.transport
}
// GetRoutingTable returns the Raft routing table for this storage
func (s *RaftEnabledStorage) GetRoutingTable() RoutingTable {
return s.routingTable
}
// GetReplicaRegistry returns the replica registry for this storage
func (s *RaftEnabledStorage) GetReplicaRegistry() ReplicaRegistry {
return s.replicaRegistry
}
// RegisterReplica registers a replica with this RaftEnabledStorage
// This should be called after both the replica and RaftEnabledStorage are created
func (s *RaftEnabledStorage) RegisterReplica(partitionID storage.PartitionID, replica *Replica) error {
partitionKey := &gitalypb.PartitionKey{
PartitionId: uint64(partitionID),
AuthorityName: replica.authorityName,
}
s.replicaRegistry.RegisterReplica(partitionKey, replica)
return nil
}
// DeregisterReplica removes a replica from this RaftEnabledStorage.
// This should be called when the replica is closing.
func (s *RaftEnabledStorage) DeregisterReplica(replica *Replica) {
partitionKey := &gitalypb.PartitionKey{
PartitionId: uint64(replica.ptnID),
AuthorityName: replica.authorityName,
}
s.replicaRegistry.DeregisterReplica(partitionKey)
}
// Node adds Raft functionality to each storage
type Node struct {
storages map[string]*RaftEnabledStorage
}
// NewNode creates a new Node with Raft functionality.
// The Storage field in RaftEnabledStorage will be nil
// and must be populated later.
func NewNode(cfg config.Cfg, logger log.Logger, dbMgr *databasemgr.DBManager, connsPool *client.Pool) (*Node, error) {
n := &Node{
storages: make(map[string]*RaftEnabledStorage),
}
for _, cfgStorage := range cfg.Storages {
var baseStorage storage.Storage // Can be nil initially
// Get the storage's KV store for the routing table
kvStore, err := dbMgr.GetDB(cfgStorage.Name)
if err != nil {
return nil, fmt.Errorf("get KV store for storage %q: %w", cfgStorage.Name, err)
}
// Create per-storage Raft components
routingTable := NewKVRoutingTable(kvStore)
replicaRegistry := NewReplicaRegistry()
transport := NewGrpcTransport(logger, cfg, routingTable, replicaRegistry, connsPool)
n.storages[cfgStorage.Name] = &RaftEnabledStorage{
Storage: baseStorage, // storage.Storage would be nil initially
transport: transport,
routingTable: routingTable,
replicaRegistry: replicaRegistry,
}
}
return n, nil
}
// SetBaseStorage sets the underlying storage.Storage for a specific RaftEnabledStorage.
func (n *Node) SetBaseStorage(storageName string, baseStorage storage.Storage) error {
raftEnabledStorage, ok := n.storages[storageName]
if !ok {
return fmt.Errorf("no raft enabled storage found for storage %q", storageName)
}
if raftEnabledStorage.Storage != nil {
return fmt.Errorf("base storage already set for storage %q", storageName)
}
raftEnabledStorage.Storage = baseStorage
return nil
}
// GetStorage implements storage.Node interface
func (n *Node) GetStorage(storageName string) (storage.Storage, error) {
wrapper, ok := n.storages[storageName]
if !ok {
return nil, storage.NewStorageNotFoundError(storageName)
}
return wrapper, nil
}