internal/praefect/replicator.go (500 lines of code) (raw):
package praefect
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/service/repository"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage/storagemgr"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/log"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/config"
"gitlab.com/gitlab-org/gitaly/v16/internal/praefect/datastore"
prommetrics "gitlab.com/gitlab-org/gitaly/v16/internal/prometheus/metrics"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/correlation"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
)
// Replicator performs the actual replication logic between two nodes
type Replicator interface {
// Replicate propagates changes from the source to the target
Replicate(ctx context.Context, event datastore.ReplicationEvent, source, target *grpc.ClientConn) error
// Destroy will remove the target repo on the specified target connection
Destroy(ctx context.Context, event datastore.ReplicationEvent, target *grpc.ClientConn) error
}
type defaultReplicator struct {
rs datastore.RepositoryStore
log log.Logger
}
func (dr defaultReplicator) Replicate(ctx context.Context, event datastore.ReplicationEvent, sourceCC, targetCC *grpc.ClientConn) error {
targetRepository := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
RelativePath: event.Job.ReplicaPath,
}
sourceRepository := &gitalypb.Repository{
StorageName: event.Job.SourceNodeStorage,
RelativePath: event.Job.ReplicaPath,
}
logger := dr.log.WithFields(log.Fields{
logWithVirtualStorage: event.Job.VirtualStorage,
logWithReplTarget: event.Job.TargetNodeStorage,
"replication_job_source": event.Job.SourceNodeStorage,
correlation.FieldName: correlation.ExtractFromContext(ctx),
})
generation, err := dr.rs.GetReplicatedGeneration(ctx, event.Job.RepositoryID, event.Job.SourceNodeStorage, event.Job.TargetNodeStorage)
if err != nil {
// Later generation might have already been replicated by an earlier replication job. If that's the case,
// we'll simply acknowledge the job. This also prevents accidental downgrades from happening.
var downgradeErr datastore.DowngradeAttemptedError
if errors.As(err, &downgradeErr) {
message := "repository downgrade prevented"
if downgradeErr.CurrentGeneration == downgradeErr.AttemptedGeneration {
message = "target repository already on the same generation, skipping replication job"
}
logger.WithError(downgradeErr).Info(message)
return nil
}
return fmt.Errorf("get replicated generation: %w", err)
}
targetRepositoryClient := gitalypb.NewRepositoryServiceClient(targetCC)
if _, err := targetRepositoryClient.ReplicateRepository(ctx, &gitalypb.ReplicateRepositoryRequest{
Source: sourceRepository,
Repository: targetRepository,
}); err != nil {
if errors.Is(err, repository.ErrInvalidSourceRepository) {
if err := dr.rs.DeleteInvalidRepository(ctx, event.Job.RepositoryID, event.Job.SourceNodeStorage); err != nil {
return fmt.Errorf("delete invalid repository: %w", err)
}
logger.Info("invalid repository record removed")
return nil
}
return fmt.Errorf("failed to create repository: %w", err)
}
// Object pool state between the source and target repository should match. Get object pool
// information for the source and target repository and reconcile any detected differences.
sourceObjectPoolClient := gitalypb.NewObjectPoolServiceClient(sourceCC)
targetObjectPoolClient := gitalypb.NewObjectPoolServiceClient(targetCC)
sourceResp, err := sourceObjectPoolClient.GetObjectPool(ctx, &gitalypb.GetObjectPoolRequest{
Repository: sourceRepository,
})
if err != nil {
return err
}
targetResp, err := targetObjectPoolClient.GetObjectPool(ctx, &gitalypb.GetObjectPoolRequest{
Repository: targetRepository,
})
if err != nil {
return err
}
sourcePool := sourceResp.GetObjectPool()
targetPool := targetResp.GetObjectPool()
switch {
// If the source and target object pool state already match, there is nothing to sync.
case sourcePool.GetRepository().GetRelativePath() == targetPool.GetRepository().GetRelativePath():
// If the target repository is linked to a non-matching object pool it must be disconnected.
case targetPool != nil:
if _, err := targetObjectPoolClient.DisconnectGitAlternates(ctx, &gitalypb.DisconnectGitAlternatesRequest{
Repository: targetRepository,
}); err != nil {
return err
}
// If the source repository is not linked to an object pool, the target repository does not
// need to be linked to a new object pool. Otherwise, continue to object pool linking.
if sourcePool == nil {
break
}
fallthrough
// If the source pool is linked to a repository, link the target repository to the matching
// target object pool.
case targetPool == nil:
targetObjectPool := proto.Clone(sourcePool).(*gitalypb.ObjectPool)
targetObjectPool.GetRepository().StorageName = targetRepository.GetStorageName()
if _, err := targetObjectPoolClient.LinkRepositoryToObjectPool(ctx, &gitalypb.LinkRepositoryToObjectPoolRequest{
ObjectPool: targetObjectPool,
Repository: targetRepository,
}); err != nil {
if !strings.Contains(err.Error(), storagemgr.ErrRepositoriesAreInDifferentPartitions.Error()) {
return err
}
// The pool and the member repository were not in the same partition and thus failed to be linked.
// When transactions are enabled, the pool and the member repository must be in the same partition
// are only serialized within partitions. Moving a repository into a different partition is difficult
// as one would have to create a new repository in the same partition as the pool, and delete the old
// one. We don't intend to support moving repositories between partitions with Praefect. If we hit this
// error, we'll leave the repository without the alternate link.
}
}
if generation != datastore.GenerationUnknown {
return dr.rs.SetGeneration(ctx,
event.Job.RepositoryID,
event.Job.TargetNodeStorage,
event.Job.RelativePath,
generation,
)
}
return nil
}
func (dr defaultReplicator) Destroy(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
targetRepo := &gitalypb.Repository{
StorageName: event.Job.TargetNodeStorage,
RelativePath: event.Job.ReplicaPath,
}
repoSvcClient := gitalypb.NewRepositoryServiceClient(targetCC)
if _, err := repoSvcClient.RemoveRepository(ctx, &gitalypb.RemoveRepositoryRequest{
Repository: targetRepo,
}); err != nil {
return err
}
// If the repository was deleted but this fails, we'll know by the repository not having a record in the virtual
// storage but having one for the storage. We can later retry the deletion.
if err := dr.rs.DeleteReplica(ctx, event.Job.RepositoryID, event.Job.TargetNodeStorage); err != nil {
if !errors.Is(err, datastore.ErrNoRowsAffected) {
return err
}
dr.log.WithField(correlation.FieldName, correlation.ExtractFromContext(ctx)).
WithError(err).
Info("deleted repository did not have a store entry")
}
return nil
}
// ReplMgr is a replication manager for handling replication jobs
type ReplMgr struct {
log log.Logger
queue datastore.ReplicationEventQueue
hc HealthChecker
nodes NodeSet
storageNamesByVirtualStorage map[string][]string // replicas this replicator is responsible for
replicator Replicator // does the actual replication logic
replInFlightMetric *prometheus.GaugeVec
replLatencyMetric prommetrics.HistogramVec
replDelayMetric prommetrics.HistogramVec
replJobTimeout time.Duration
dequeueBatchSize uint
parallelStorageProcessingWorkers uint
repositoryStore datastore.RepositoryStore
}
// ReplMgrOpt allows a replicator to be configured with additional options
type ReplMgrOpt func(*ReplMgr)
// WithLatencyMetric is an option to set the latency prometheus metric
func WithLatencyMetric(h prommetrics.HistogramVec) func(*ReplMgr) {
return func(m *ReplMgr) {
m.replLatencyMetric = h
}
}
// WithDelayMetric is an option to set the delay prometheus metric
func WithDelayMetric(h prommetrics.HistogramVec) func(*ReplMgr) {
return func(m *ReplMgr) {
m.replDelayMetric = h
}
}
// WithDequeueBatchSize configures the number of events to dequeue in a single batch.
func WithDequeueBatchSize(size uint) func(*ReplMgr) {
return func(m *ReplMgr) {
m.dequeueBatchSize = size
}
}
// WithParallelStorageProcessingWorkers configures the number of workers used to process replication
// events per virtual storage.
func WithParallelStorageProcessingWorkers(n uint) func(*ReplMgr) {
return func(m *ReplMgr) {
m.parallelStorageProcessingWorkers = n
}
}
// NewReplMgr initializes a replication manager with the provided dependencies
// and options
func NewReplMgr(log log.Logger, storageNames map[string][]string, queue datastore.ReplicationEventQueue, rs datastore.RepositoryStore, hc HealthChecker, nodes NodeSet, opts ...ReplMgrOpt) ReplMgr {
r := ReplMgr{
log: log.WithField("component", "replication_manager"),
queue: queue,
replicator: defaultReplicator{rs: rs, log: log.WithField("component", "replicator")},
storageNamesByVirtualStorage: storageNames,
hc: hc,
nodes: nodes,
replInFlightMetric: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "gitaly_praefect_replication_jobs",
Help: "Number of replication jobs in flight.",
}, []string{"virtual_storage", "gitaly_storage", "change_type"},
),
replLatencyMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
replDelayMetric: prometheus.NewHistogramVec(prometheus.HistogramOpts{}, []string{"type"}),
dequeueBatchSize: config.DefaultReplicationConfig().BatchSize,
parallelStorageProcessingWorkers: 1,
repositoryStore: rs,
}
for _, opt := range opts {
opt(&r)
}
for virtual, sn := range storageNames {
if len(sn) < int(r.parallelStorageProcessingWorkers) {
r.log.Info(fmt.Sprintf("parallel processing workers decreased from %d "+
"configured with config to %d according to minumal amount of "+
"storages in the virtual storage %q",
r.parallelStorageProcessingWorkers, len(storageNames), virtual,
))
r.parallelStorageProcessingWorkers = uint(len(storageNames))
}
}
return r
}
//nolint:revive // This is unintentionally missing documentation.
func (r ReplMgr) Describe(ch chan<- *prometheus.Desc) {
prometheus.DescribeByCollect(r, ch)
}
//nolint:revive // This is unintentionally missing documentation.
func (r ReplMgr) Collect(ch chan<- prometheus.Metric) {
r.replInFlightMetric.Collect(ch)
}
const (
logWithReplTarget = "replication_job_target"
logWithVirtualStorage = "virtual_storage"
)
// ExpBackoffFactory creates exponentially growing durations.
type ExpBackoffFactory struct {
Start, Max time.Duration
}
// Create returns a backoff function based on Start and Max time durations.
func (b ExpBackoffFactory) Create() (Backoff, BackoffReset) {
const factor = 2
duration := b.Start
return func() time.Duration {
defer func() {
duration *= time.Duration(factor)
if (duration) >= b.Max {
duration = b.Max
}
}()
return duration
}, func() {
duration = b.Start
}
}
type (
// Backoff returns next backoff.
Backoff func() time.Duration
// BackoffReset resets backoff provider.
BackoffReset func()
)
func getCorrelationID(params datastore.Params) string {
correlationID := ""
if val, found := params[datastore.CorrelationIDKey]; found {
correlationID, _ = val.(string)
}
return correlationID
}
// BackoffFactory creates backoff function and a reset pair for it.
type BackoffFactory interface {
// Create return new backoff provider and a reset function for it.
Create() (Backoff, BackoffReset)
}
// ProcessBacklog starts processing of queued jobs.
// It will be processing jobs until ctx is Done. ProcessBacklog
// blocks until all backlog processing goroutines have returned
func (r ReplMgr) ProcessBacklog(ctx context.Context, b BackoffFactory) {
var wg sync.WaitGroup
defer wg.Wait()
for virtualStorage := range r.storageNamesByVirtualStorage {
wg.Add(1)
go func(virtualStorage string) {
defer wg.Done()
r.processBacklog(ctx, b, virtualStorage)
}(virtualStorage)
}
}
// ProcessStale starts a background process to acknowledge stale replication jobs.
// It will process jobs until ctx is Done.
func (r ReplMgr) ProcessStale(ctx context.Context, ticker helper.Ticker, staleAfter time.Duration) chan struct{} {
done := make(chan struct{})
go func() {
defer close(done)
ticker.Reset()
for {
select {
case <-ticker.C():
n, err := r.queue.AcknowledgeStale(ctx, staleAfter)
if err != nil {
r.log.WithError(err).Error("background periodical acknowledgement for stale replication jobs")
} else if n > 0 {
logger := r.log.WithFields(log.Fields{"component": "ProcessStale", "count": n})
logger.Info("stale replication jobs deleted")
}
ticker.Reset()
case <-ctx.Done():
return
}
}
}()
return done
}
func (r ReplMgr) processBacklog(ctx context.Context, b BackoffFactory, virtualStorage string) {
var wg sync.WaitGroup
defer wg.Wait()
logger := r.log.WithField(logWithVirtualStorage, virtualStorage)
logger.Info("processing started")
// We should make a graceful shutdown of the processing loop and don't want to interrupt
// in-flight operations. That is why we suppress cancellation on the provided context.
appCtx := ctx
ctx = context.WithoutCancel(ctx)
storageNames := r.storageNamesByVirtualStorage[virtualStorage]
type StorageProcessing struct {
StorageName string
Backoff
BackoffReset
}
storagesQueue := make(chan StorageProcessing, len(storageNames))
for _, storageName := range storageNames {
backoff, reset := b.Create()
storagesQueue <- StorageProcessing{StorageName: storageName, Backoff: backoff, BackoffReset: reset}
}
for i := uint(0); i < r.parallelStorageProcessingWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
var storageProcessing StorageProcessing
select {
case <-appCtx.Done():
logger.WithError(appCtx.Err()).Info("processing stopped")
return
case storageProcessing = <-storagesQueue:
}
healthyStorages := r.hc.HealthyNodes()[virtualStorage]
healthy := false
for _, healthyStorageName := range healthyStorages {
if healthyStorageName != storageProcessing.StorageName {
continue
}
healthy = true
break
}
var processedEvents int
if healthy {
target, ok := r.nodes[virtualStorage][storageProcessing.StorageName]
if !ok {
logger.WithField("storage", storageProcessing.StorageName).Error("no connection to target storage")
} else {
processedEvents = r.handleNode(ctx, virtualStorage, target)
}
}
if processedEvents == 0 {
// if the storage is not healthy or if there is no events to
// process we don't put it back to the queue immediately but
// wait for certain time period first.
go func() {
select {
case <-time.After(storageProcessing.Backoff()):
storagesQueue <- storageProcessing
case <-appCtx.Done():
logger.WithError(appCtx.Err()).Info("processing stopped")
return
}
}()
} else {
storageProcessing.BackoffReset()
storagesQueue <- storageProcessing
}
}
}()
}
}
func (r ReplMgr) handleNode(ctx context.Context, virtualStorage string, target Node) int {
logger := r.log.WithFields(log.Fields{logWithVirtualStorage: virtualStorage, logWithReplTarget: target.Storage})
events, err := r.queue.Dequeue(ctx, virtualStorage, target.Storage, int(r.dequeueBatchSize))
if err != nil {
logger.WithError(err).Error("failed to dequeue replication events")
return 0
}
if len(events) == 0 {
return 0
}
stopHealthUpdate := r.startHealthUpdate(ctx, logger, events)
defer stopHealthUpdate()
eventIDsByState := map[datastore.JobState][]uint64{}
for _, event := range events {
state := r.handleNodeEvent(ctx, logger, target.Connection, event)
eventIDsByState[state] = append(eventIDsByState[state], event.ID)
}
for state, eventIDs := range eventIDsByState {
ackIDs, err := r.queue.Acknowledge(ctx, state, eventIDs)
if err != nil {
logger.WithFields(log.Fields{"state": state, "event_ids": eventIDs}).
WithError(err).
Error("failed to acknowledge replication events")
continue
}
notAckIDs := subtractUint64(ackIDs, eventIDs)
if len(notAckIDs) > 0 {
logger.WithFields(log.Fields{"state": state, "event_ids": notAckIDs}).
WithError(err).
Error("replication events were not acknowledged")
}
}
return len(events)
}
func (r ReplMgr) startHealthUpdate(ctx context.Context, logger log.Logger, events []datastore.ReplicationEvent) context.CancelFunc {
healthUpdateCtx, healthUpdateCancel := context.WithCancel(ctx)
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
if err := r.queue.StartHealthUpdate(healthUpdateCtx, ticker.C, events); err != nil {
ids := make([]uint64, len(events))
for i, event := range events {
ids[i] = event.ID
}
logger.WithField("event_ids", ids).WithError(err).Error("health update loop")
}
}()
return healthUpdateCancel
}
func (r ReplMgr) handleNodeEvent(ctx context.Context, logger log.Logger, targetConnection *grpc.ClientConn, event datastore.ReplicationEvent) datastore.JobState {
cid := getCorrelationID(event.Meta)
ctx = correlation.ContextWithCorrelation(ctx, cid)
// we want it to be queryable by common `json.correlation_id` filter
logger = logger.WithField(correlation.FieldName, cid)
// we log all details about the event only once before start of the processing
logger.WithField("event", event).Info("replication job processing started")
if err := r.processReplicationEvent(ctx, event, targetConnection); err != nil {
newState := datastore.JobStateFailed
if event.Attempt <= 0 {
newState = datastore.JobStateDead
}
logger.WithError(err).WithField("new_state", newState).Error("replication job processing finished")
return newState
}
newState := datastore.JobStateCompleted
logger.WithField("new_state", newState).Info("replication job processing finished")
return newState
}
// backfillReplicaPath backfills the replica path in the replication job. As of 14.5, not all jobs are guaranteed
// to have a replica path on them yet. There are few special cased jobs which won't be scheduled anymore in 14.6 a
// and thus do not need to use replica paths.
func (r ReplMgr) backfillReplicaPath(ctx context.Context, event datastore.ReplicationEvent) (string, error) {
switch {
// The reconciler scheduled DeleteReplica jobs which are missing repository ID. 14.5 has
// dropped this logic and doesn't leave orphaned records any more as Praefect has a walker
// to identify stale replicas. Any jobs still in flight have been scheduled prior to 14.4 and
// should be handled in the old manner.
case event.Job.Change == datastore.DeleteReplica && event.Job.RepositoryID == 0:
fallthrough
// 14.5 also doesn't schedule DeleteRepo jobs. Any jobs are again old jobs in-flight.
// The repository ID in delete jobs scheduled in 14.4 won't be present anymore at the time the
// replication job is being executed, as the the 'repositories' record is deleted. Given that,
// it's not possible to get the replica path. In 14.4, Praefect intercepts deletes and handles
// them without scheduling replication jobs. The 'delete' jobs still in flight are handled as before
// for backwards compatibility.
case event.Job.Change == datastore.DeleteRepo:
fallthrough
default:
replicaPath, err := r.repositoryStore.GetReplicaPath(ctx, event.Job.RepositoryID)
if err != nil {
return "", fmt.Errorf("get replica path: %w", err)
}
return replicaPath, nil
}
}
// ProcessReplicationEvent processes a single replication event given the target client connection
func (r ReplMgr) ProcessReplicationEvent(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
return r.processReplicationEvent(ctx, event, targetCC)
}
func (r ReplMgr) processReplicationEvent(ctx context.Context, event datastore.ReplicationEvent, targetCC *grpc.ClientConn) error {
var cancel func()
if r.replJobTimeout > 0 {
ctx, cancel = context.WithTimeout(ctx, r.replJobTimeout)
} else {
ctx, cancel = context.WithCancel(ctx)
}
defer cancel()
replStart := time.Now()
r.replDelayMetric.WithLabelValues(event.Job.Change.String()).Observe(replStart.Sub(event.CreatedAt).Seconds())
inFlightGauge := r.replInFlightMetric.WithLabelValues(event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.Change.String())
inFlightGauge.Inc()
defer inFlightGauge.Dec()
var err error
event.Job.ReplicaPath, err = r.backfillReplicaPath(ctx, event)
if err != nil {
return fmt.Errorf("choose replica path: %w", err)
}
switch event.Job.Change {
case datastore.UpdateRepo:
source, ok := r.nodes[event.Job.VirtualStorage][event.Job.SourceNodeStorage]
if !ok {
return fmt.Errorf("no connection to source node %q/%q", event.Job.VirtualStorage, event.Job.SourceNodeStorage)
}
ctx, err = storage.InjectGitalyServers(ctx, event.Job.SourceNodeStorage, source.Address, source.Token)
if err != nil {
return fmt.Errorf("inject Gitaly servers into context: %w", err)
}
err = r.replicator.Replicate(ctx, event, source.Connection, targetCC)
case datastore.DeleteRepo, datastore.DeleteReplica:
err = r.replicator.Destroy(ctx, event, targetCC)
default:
err = fmt.Errorf("unknown replication change type encountered: %q", event.Job.Change)
}
if err != nil {
return err
}
r.replLatencyMetric.WithLabelValues(event.Job.Change.String()).Observe(time.Since(replStart).Seconds())
return nil
}
// subtractUint64 returns new slice that has all elements from left that does not exist at right.
func subtractUint64(l, r []uint64) []uint64 {
if len(l) == 0 {
return nil
}
if len(r) == 0 {
result := make([]uint64, len(l))
copy(result, l)
return result
}
excludeSet := make(map[uint64]struct{}, len(l))
for _, v := range r {
excludeSet[v] = struct{}{}
}
var result []uint64
for _, v := range l {
if _, found := excludeSet[v]; !found {
result = append(result, v)
}
}
return result
}