internal/praefect/nodes/manager.go (320 lines of code) (raw):
package nodes
import (
"context"
"database/sql"
"errors"
"fmt"
"math/rand"
"sync"
"time"
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
gitalyauth "gitlab.com/gitlab-org/gitaly/v16/auth"
"gitlab.com/gitlab-org/gitaly/v16/internal/datastructure"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/protoregistry"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/proxy"
"gitlab.com/gitlab-org/gitaly/v16/internal/grpc/sidechannel"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/metrics"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/middleware"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/nodes/tracker"
prommetrics "gitlab.com/gitlab-org/gitaly/v16/internal/prometheus/metrics"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)
// Shard is a primary with a set of secondaries
type Shard struct {
Primary Node
Secondaries []Node
}
//nolint:revive // This is unintentionally missing documentation.
func (s Shard) GetNode(storage string) (Node, error) {
if storage == s.Primary.GetStorage() {
return s.Primary, nil
}
for _, node := range s.Secondaries {
if storage == node.GetStorage() {
return node, nil
}
}
return nil, fmt.Errorf("node with storage %q does not exist", storage)
}
// GetHealthySecondaries returns all secondaries of the shard whose which are
// currently known to be healthy.
func (s Shard) GetHealthySecondaries() []Node {
healthySecondaries := make([]Node, 0, len(s.Secondaries))
for _, secondary := range s.Secondaries {
if !secondary.IsHealthy() {
continue
}
healthySecondaries = append(healthySecondaries, secondary)
}
return healthySecondaries
}
// Manager is responsible for returning shards for virtual storages
type Manager interface {
GetShard(ctx context.Context, virtualStorageName string) (Shard, error)
// GetSyncedNode returns a random storage node based on the state of the replication.
// It returns primary in case there are no up to date secondaries or error occurs.
GetSyncedNode(ctx context.Context, virtualStorageName, repoPath string) (Node, error)
// HealthyNodes returns healthy storages by virtual storage.
HealthyNodes() map[string][]string
// Nodes returns nodes by their virtual storages.
Nodes() map[string][]Node
}
const (
// healthcheckTimeout is the max duration allowed for checking of node health status.
// If check takes more time it considered as failed.
healthcheckTimeout = 1 * time.Second
// healthcheckThreshold is the number of consecutive healthpb.HealthCheckResponse_SERVING necessary
// for deeming a node "healthy"
healthcheckThreshold = 3
)
// Node represents some metadata of a node as well as a connection
type Node interface {
GetStorage() string
GetAddress() string
GetToken() string
GetConnection() *grpc.ClientConn
// IsHealthy reports if node is healthy and can handle requests.
// Node considered healthy if last 'healthcheckThreshold' checks were positive.
IsHealthy() bool
// CheckHealth executes health check for the node and tracks last 'healthcheckThreshold' checks for it.
CheckHealth(context.Context) (bool, error)
}
// Mgr is a concrete type that adheres to the Manager interface
type Mgr struct {
// strategies is a map of strategies keyed on virtual storage name
strategies map[string]leaderElectionStrategy
// nodes contains nodes by their virtual storages
nodes map[string][]Node
csg datastore.ConsistentStoragesGetter
}
// leaderElectionStrategy defines the interface by which primary and
// secondaries are managed.
type leaderElectionStrategy interface {
start(bootstrapInterval, monitorInterval time.Duration)
stop()
checkNodes(context.Context) error
GetShard(ctx context.Context) (Shard, error)
}
// ErrPrimaryNotHealthy indicates the primary of a shard is not in a healthy state and hence
// should not be used for a new request
var ErrPrimaryNotHealthy = errors.New("primary gitaly is not healthy")
const (
dialTimeout = 10 * time.Second
dialMaxBackoff = 1 * time.Second
)
// Dial dials a node with the necessary interceptors configured.
func Dial(
ctx context.Context,
node *config.Node,
registry *protoregistry.Registry,
errorTracker tracker.ErrorTracker,
handshaker client.Handshaker,
sidechannelRegistry *sidechannel.Registry,
log log.Logger,
) (*grpc.ClientConn, error) {
streamInterceptors := []grpc.StreamClientInterceptor{
grpcprometheus.StreamClientInterceptor,
sidechannel.NewStreamProxy(sidechannelRegistry, log),
}
if errorTracker != nil {
streamInterceptors = append(streamInterceptors, middleware.StreamErrorHandler(registry, errorTracker, node.Storage))
}
unaryInterceptors := []grpc.UnaryClientInterceptor{
grpcprometheus.UnaryClientInterceptor,
sidechannel.NewUnaryProxy(sidechannelRegistry, log),
}
b := backoff.DefaultConfig
b.MaxDelay = dialMaxBackoff
dialOpts := []grpc.DialOption{
grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())),
grpc.WithPerRPCCredentials(gitalyauth.RPCCredentialsV2(node.Token)),
grpc.WithChainStreamInterceptor(streamInterceptors...),
grpc.WithChainUnaryInterceptor(unaryInterceptors...),
grpc.WithConnectParams(grpc.ConnectParams{Backoff: b}),
client.UnaryInterceptor(),
client.StreamInterceptor(),
}
return client.New(ctx, node.Address, client.WithGrpcOptions(dialOpts), client.WithHandshaker(handshaker))
}
// NewManager creates a new NodeMgr based on virtual storage configs
func NewManager(
log log.Logger,
c config.Config,
db *sql.DB,
csg datastore.ConsistentStoragesGetter,
latencyHistogram prommetrics.HistogramVec,
registry *protoregistry.Registry,
errorTracker tracker.ErrorTracker,
handshaker client.Handshaker,
sidechannelRegistry *sidechannel.Registry,
) (*Mgr, error) {
if !c.Failover.Enabled {
errorTracker = nil
}
ctx, cancel := context.WithTimeout(context.Background(), dialTimeout)
defer cancel()
nodes := make(map[string][]Node, len(c.VirtualStorages))
strategies := make(map[string]leaderElectionStrategy, len(c.VirtualStorages))
for _, virtualStorage := range c.VirtualStorages {
log = log.WithField("virtual_storage", virtualStorage.Name)
ns := make([]*nodeStatus, 0, len(virtualStorage.Nodes))
for _, node := range virtualStorage.Nodes {
conn, err := Dial(ctx, node, registry, errorTracker, handshaker, sidechannelRegistry, log)
if err != nil {
return nil, err
}
cs := newConnectionStatus(*node, conn, log, latencyHistogram, errorTracker)
ns = append(ns, cs)
}
for _, node := range ns {
nodes[virtualStorage.Name] = append(nodes[virtualStorage.Name], node)
}
if c.Failover.Enabled {
if c.Failover.ElectionStrategy == config.ElectionStrategySQL {
strategies[virtualStorage.Name] = newSQLElector(virtualStorage.Name, c, db, log, ns)
} else {
strategies[virtualStorage.Name] = newLocalElector(virtualStorage.Name, log, ns)
}
} else {
strategies[virtualStorage.Name] = newDisabledElector(virtualStorage.Name, ns)
}
}
return &Mgr{
strategies: strategies,
nodes: nodes,
csg: csg,
}, nil
}
// Start will bootstrap the node manager by calling healthcheck on the nodes as well as kicking off
// the monitoring process. Start must be called before NodeMgr can be used.
func (n *Mgr) Start(bootstrapInterval, monitorInterval time.Duration) {
for _, strategy := range n.strategies {
strategy.start(bootstrapInterval, monitorInterval)
}
}
// Stop will stop all monitoring processes and closes connections. Must only be called once.
func (n *Mgr) Stop() {
for _, strategy := range n.strategies {
strategy.stop()
}
for _, nodes := range n.nodes {
for _, node := range nodes {
_ = node.GetConnection().Close()
}
}
}
// checkShards performs health checks on all the available shards. The
// election strategy is responsible for determining the criteria for
// when to elect a new primary and when a node is down.
func (n *Mgr) checkShards() {
for _, strategy := range n.strategies {
ctx := context.Background()
//nolint:errcheck // We don't care for this error. The nodes manager is only
// used for the deprecated SQL elector anyway.
strategy.checkNodes(ctx)
}
}
// ErrVirtualStorageNotExist indicates the node manager is not aware of the virtual storage for which a shard is being requested
var ErrVirtualStorageNotExist = errors.New("virtual storage does not exist")
// GetShard retrieves a shard for a virtual storage name
func (n *Mgr) GetShard(ctx context.Context, virtualStorageName string) (Shard, error) {
strategy, ok := n.strategies[virtualStorageName]
if !ok {
return Shard{}, fmt.Errorf("virtual storage %q: %w", virtualStorageName, ErrVirtualStorageNotExist)
}
return strategy.GetShard(ctx)
}
// GetPrimary returns the current primary of a repository. This is an adapter so NodeManager can be used
// as a praefect.PrimaryGetter in newer code which written to support repository specific primaries.
func (n *Mgr) GetPrimary(ctx context.Context, virtualStorage string, _ int64) (string, error) {
shard, err := n.GetShard(ctx, virtualStorage)
if err != nil {
return "", err
}
return shard.Primary.GetStorage(), nil
}
//nolint:revive // This is unintentionally missing documentation.
func (n *Mgr) GetSyncedNode(ctx context.Context, virtualStorageName, repoPath string) (Node, error) {
_, upToDateStorages, err := n.csg.GetConsistentStorages(ctx, virtualStorageName, repoPath)
if err != nil && !errors.Is(err, datastore.ErrRepositoryNotFound) {
return nil, err
}
if upToDateStorages == nil || upToDateStorages.IsEmpty() {
// this possible when there is no data yet in the database for the repository
shard, err := n.GetShard(ctx, virtualStorageName)
if err != nil {
return nil, fmt.Errorf("get shard for %q: %w", virtualStorageName, err)
}
upToDateStorages = datastructure.SetFromValues(shard.Primary.GetStorage())
}
healthyStorages := make([]Node, 0, upToDateStorages.Len())
for _, node := range n.Nodes()[virtualStorageName] {
if !node.IsHealthy() {
continue
}
if !upToDateStorages.HasValue(node.GetStorage()) {
continue
}
healthyStorages = append(healthyStorages, node)
}
if len(healthyStorages) == 0 {
return nil, fmt.Errorf("no healthy nodes: %w", ErrPrimaryNotHealthy)
}
return healthyStorages[rand.Intn(len(healthyStorages))], nil
}
//nolint:revive // This is unintentionally missing documentation.
func (n *Mgr) HealthyNodes() map[string][]string {
healthy := make(map[string][]string, len(n.nodes))
for vs, nodes := range n.nodes {
storages := make([]string, 0, len(nodes))
for _, node := range nodes {
if node.IsHealthy() {
storages = append(storages, node.GetStorage())
}
}
healthy[vs] = storages
}
return healthy
}
//nolint:revive // This is unintentionally missing documentation.
func (n *Mgr) Nodes() map[string][]Node { return n.nodes }
func newConnectionStatus(node config.Node, cc *grpc.ClientConn, l log.Logger, latencyHist prommetrics.HistogramVec, errorTracker tracker.ErrorTracker) *nodeStatus {
return &nodeStatus{
node: node,
clientConn: cc,
log: l,
latencyHist: latencyHist,
errTracker: errorTracker,
}
}
type nodeStatus struct {
node config.Node
clientConn *grpc.ClientConn
log log.Logger
latencyHist prommetrics.HistogramVec
mtx sync.RWMutex
statuses []bool
errTracker tracker.ErrorTracker
}
// GetStorage gets the storage name of a node
func (n *nodeStatus) GetStorage() string {
return n.node.Storage
}
// GetAddress gets the address of a node
func (n *nodeStatus) GetAddress() string {
return n.node.Address
}
// GetToken gets the token of a node
func (n *nodeStatus) GetToken() string {
return n.node.Token
}
// GetConnection gets the client connection of a node
func (n *nodeStatus) GetConnection() *grpc.ClientConn {
return n.clientConn
}
func (n *nodeStatus) IsHealthy() bool {
n.mtx.RLock()
healthy := n.isHealthy()
n.mtx.RUnlock()
return healthy
}
func (n *nodeStatus) isHealthy() bool {
if len(n.statuses) < healthcheckThreshold {
return false
}
for _, ok := range n.statuses[len(n.statuses)-healthcheckThreshold:] {
if !ok {
return false
}
}
return true
}
func (n *nodeStatus) updateStatus(status bool) {
n.mtx.Lock()
n.statuses = append(n.statuses, status)
if len(n.statuses) > healthcheckThreshold {
n.statuses = n.statuses[1:]
}
n.mtx.Unlock()
}
func (n *nodeStatus) CheckHealth(ctx context.Context) (bool, error) {
health := healthpb.NewHealthClient(n.clientConn)
if n.errTracker != nil {
health = tracker.NewHealthClient(health, n.GetStorage(), n.errTracker)
}
ctx, cancel := context.WithTimeout(ctx, healthcheckTimeout)
defer cancel()
start := time.Now()
resp, err := health.Check(ctx, &healthpb.HealthCheckRequest{})
n.latencyHist.WithLabelValues(n.node.Storage).Observe(time.Since(start).Seconds())
if err != nil {
n.log.WithError(err).WithFields(log.Fields{
"storage": n.node.Storage,
"address": n.node.Address,
}).Warn("error when pinging healthcheck")
}
status := resp.GetStatus() == healthpb.HealthCheckResponse_SERVING
metrics.NodeLastHealthcheckGauge.WithLabelValues(n.GetStorage()).Set(metrics.BoolAsFloat(status))
n.updateStatus(status)
return status, err
}