internal/praefect/router.go (100 lines of code) (raw):
package praefect
import (
"context"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"google.golang.org/grpc"
)
const (
logFieldRepositoryID = "repository_id"
logFieldReplicaPath = "replica_path"
logFieldAdditionalReplicaPath = "additional_replica_path"
logFieldPrimary = "primary"
logFieldSecondaries = "secondaries"
logFieldStorage = "storage"
)
type additionalRepositoryNotFoundError struct {
storageName string
relativePath string
}
func (e additionalRepositoryNotFoundError) Error() string {
return "additional repository not found"
}
func addRouteLogField(ctx context.Context, fields log.Fields) {
if f := log.CustomFieldsFromContext(ctx); f != nil {
f.RecordMetadata("route", fields)
}
}
func routerNodeStorages(secondaries []RouterNode) []string {
storages := make([]string, len(secondaries))
for i := range secondaries {
storages[i] = secondaries[i].Storage
}
return storages
}
// RepositoryAccessorRoute describes how to route a repository scoped accessor call.
type RepositoryAccessorRoute struct {
// ReplicaPath is the disk path where the replicas are stored.
ReplicaPath string
// Node contains the details of the node that should handle the request.
Node RouterNode
}
func (r RepositoryAccessorRoute) addLogFields(ctx context.Context) {
addRouteLogField(ctx, log.Fields{
logFieldReplicaPath: r.ReplicaPath,
logFieldStorage: r.Node.Storage,
})
}
// RouterNode is a subset of a node's configuration needed to perform
// request routing.
type RouterNode struct {
// Storage is storage of the node.
Storage string
// Connection is the connection to the node.
Connection *grpc.ClientConn
}
func (r RouterNode) addLogFields(ctx context.Context) {
addRouteLogField(ctx, log.Fields{
logFieldStorage: r.Storage,
})
}
// StorageMutatorRoute describes how to route a storage scoped mutator call.
type StorageMutatorRoute struct {
// Primary is the primary node of the routing decision.
Primary RouterNode
// Secondaries are the secondary nodes of the routing decision.
Secondaries []RouterNode
}
func (r StorageMutatorRoute) addLogFields(ctx context.Context) {
addRouteLogField(ctx, log.Fields{
logFieldPrimary: r.Primary,
logFieldSecondaries: routerNodeStorages(r.Secondaries),
})
}
// RepositoryMutatorRoute describes how to route a repository scoped mutator call.
type RepositoryMutatorRoute struct {
// RepositoryID is the repository's ID as Praefect identifies it.
RepositoryID int64
// ReplicaPath is the disk path where the replicas are stored.
ReplicaPath string
// AdditionalReplicaPath is the disk path where the possible additional repository in the request
// is stored. This is only used for object pools.
AdditionalReplicaPath string
// Primary is the primary node of the transaction.
Primary RouterNode
// Secondaries are the secondary participating in a transaction.
Secondaries []RouterNode
// ReplicationTargets are additional nodes that do not participate in a transaction
// but need the changes replicated.
ReplicationTargets []string
}
func (r RepositoryMutatorRoute) addLogFields(ctx context.Context) {
addRouteLogField(ctx, log.Fields{
logFieldRepositoryID: r.RepositoryID,
logFieldReplicaPath: r.ReplicaPath,
logFieldAdditionalReplicaPath: r.AdditionalReplicaPath,
logFieldPrimary: r.Primary,
logFieldSecondaries: routerNodeStorages(r.Secondaries),
"replication_targets": r.ReplicationTargets,
})
}
// RepositoryMaintenanceRoute describes how to route a repository scoped maintenance call.
type RepositoryMaintenanceRoute struct {
// RepositoryID is the repository's ID as Praefect identifies it.
RepositoryID int64
// ReplicaPath is the disk path where the replicas are stored.
ReplicaPath string
// Nodes contains all the nodes the call should be routed to.
Nodes []RouterNode
}
func (r RepositoryMaintenanceRoute) addLogFields(ctx context.Context) {
addRouteLogField(ctx, log.Fields{
logFieldRepositoryID: r.RepositoryID,
logFieldReplicaPath: r.ReplicaPath,
"storages": routerNodeStorages(r.Nodes),
})
}
// Router decides which nodes to direct accessor and mutator RPCs to.
type Router interface {
// RouteStorageAccessor returns the node which should serve the storage accessor request.
RouteStorageAccessor(ctx context.Context, virtualStorage string) (RouterNode, error)
// RouteStorageMutator returns a route to primary and secondary nodes that should handle the
// storage mutator request.
RouteStorageMutator(ctx context.Context, virtualStorage string) (StorageMutatorRoute, error)
// RouteRepositoryAccessor returns the node that should serve the repository accessor
// request. If forcePrimary is set to `true`, it returns the primary node.
RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RepositoryAccessorRoute, error)
// RouteRepositoryMutator returns a route to primary and secondary nodes that should handle the
// repository mutator request. Additionally, it returns nodes which do not participate in the
// transaction, but to which the change should be replicated. RouteRepositoryMutator should only
// be used with existing repositories.
RouteRepositoryMutator(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error)
// RouteRepositoryCreation decides returns the primary and secondaries that should handle the repository creation
// request. It is up to the caller to store the assignments and primary information after finishing the RPC.
RouteRepositoryCreation(ctx context.Context, virtualStorage, relativePath, additionalRepoRelativePath string) (RepositoryMutatorRoute, error)
// RouteRepositoryMaintenance routes the given maintenance-style RPC to all nodes which
// should perform maintenance. This would typically include all online nodes, regardless of
// whether the repository hosted by them is up-to-date or not. Maintenance tasks should
// never be replicated.
RouteRepositoryMaintenance(ctx context.Context, virtualStorage, relativePath string) (RepositoryMaintenanceRoute, error)
}