registry/bbm/sync.go (312 lines of code) (raw):

package bbm import ( "context" "fmt" "math/rand/v2" "time" "github.com/docker/distribution/log" "github.com/docker/distribution/registry/datastore" "github.com/docker/distribution/registry/datastore/models" "gitlab.com/gitlab-org/labkit/correlation" ) // SyncWorker is the synchronous Background Migration agent of execution. type SyncWorker struct { work map[string]Work logger log.Logger db datastore.Handler maxJobAttempt int maxJobPerBatch int maxBatchTimeout time.Duration lockWaitTimeout time.Duration jobTimeout time.Duration wh Handler lastRunCompletedBBMs int } // SyncWorkerOption provides functional options for NewSyncWorker. type SyncWorkerOption func(*SyncWorker) // WithSyncMaxBatchTimeout sets the maximum batch timeout. func WithSyncMaxBatchTimeout(d time.Duration) SyncWorkerOption { return func(a *SyncWorker) { a.maxBatchTimeout = d } } // WithJobTimeout sets the maximum job timeout. func WithJobTimeout(d time.Duration) SyncWorkerOption { return func(a *SyncWorker) { a.jobTimeout = d } } // WithSyncMaxJobPerBatch sets the maximum number of jobs per batch. func WithSyncMaxJobPerBatch(d int) SyncWorkerOption { return func(a *SyncWorker) { a.maxJobPerBatch = d } } // WithSyncMaxJobAttempt sets the maximum number of attempts to try to execute a job when an error occurs. func WithSyncMaxJobAttempt(d int) SyncWorkerOption { return func(jw *SyncWorker) { jw.maxJobAttempt = d } } // WithSyncLogger sets the logger. func WithSyncLogger(l log.Logger) SyncWorkerOption { return func(jw *SyncWorker) { jw.logger = l } } // WithWorkMap sets the work map. func WithWorkMap(wm map[string]Work) SyncWorkerOption { return func(jw *SyncWorker) { jw.work = wm } } // WithSyncHandler sets the worker handler. func WithSyncHandler(wh Handler) SyncWorkerOption { return func(jw *SyncWorker) { jw.wh = wh } } // NewSyncWorker creates a new SyncWorker with the provided options. func NewSyncWorker(db datastore.Handler, opts ...SyncWorkerOption) *SyncWorker { jw := &SyncWorker{db: db} jw.applyDefaults() for _, opt := range opts { opt(jw) } jw.logger = jw.logger.WithFields(log.Fields{componentKey: syncWorkerName}) return jw } const ( syncWorkerName = "registry.bbm.SyncWorker" defaultMaxBatchTimeout = 10 * time.Minute // Default maximum timeout for a batch defaultJobTimeout = 2 * time.Minute // Default maximum timeout for a migration defaultLockWaitTimeout = 1 * time.Minute // Default timeout to wait for a lock defaultMaxJobPerBatch = 5 // Default maximum number of jobs per batch minDelayPerRun = 100 * time.Millisecond // Minimum delay between runs maxDelayPerRun = 2 * time.Second // Maximum delay between runs ) // applyDefaults sets the default values for SyncWorker fields if they are not already set. func (jw *SyncWorker) applyDefaults() { if jw.logger == nil { jw.logger = log.GetLogger() } if jw.maxBatchTimeout == 0 { jw.maxBatchTimeout = defaultMaxBatchTimeout } if jw.jobTimeout == 0 { jw.jobTimeout = defaultJobTimeout } if jw.maxJobAttempt == 0 { jw.maxJobAttempt = defaultMaxJobAttempt } if jw.maxJobPerBatch == 0 { jw.maxJobPerBatch = defaultMaxJobPerBatch } if jw.lockWaitTimeout == 0 { jw.lockWaitTimeout = defaultLockWaitTimeout } if jw.work == nil { workMap := make(map[string]Work, 0) for _, val := range AllWork() { workMap[val.Name] = val } jw.work = workMap } if jw.wh == nil { jw.wh = jw } } // Run executes all unfinished background migrations until they are either finished or a job exceeds the maxJobRetry count. func (jw *SyncWorker) Run(ctx context.Context) error { return jw.runImpl(ctx) } // runImpl executes the main loop for processing synchronous background migration jobs. It performs the following steps: // 1. Starts a new transaction and acquires a distributed lock. // 2. Processes up to `maxJobPerBatch` jobs or until `maxBatchTimeout` elapses. // 3. For each job: // - Finds an eligible job using FindJob. // - If a job is found, executes it using ExecuteJob. // - If no job is found, commits the transaction and returns. // // 4. After processing the batch, commits the transaction and releases the lock. // 5. Waits for a random duration between `minDelayPerRun` and `maxDelayPerRun` before starting the next iteration. // This method continues running until all jobs are processed or an error occurs. func (jw *SyncWorker) runImpl(ctx context.Context) error { jw.lastRunCompletedBBMs = 0 for { jw.logger = jw.logger.WithFields(log.Fields{"batch_id": correlation.SafeRandomID()}) jw.logger.Info(fmt.Sprintf("starting new batch run of %v jobs", jw.maxJobPerBatch)) // This loop runs at most `maxJobPerBatch` jobs (or until `maxBatchTimeout` elapses) before committing and releasing the background migration lock. // This ensures efficient use of the acquired lock, reducing the number of times we need to challenge the asynchronous process for the lock, // while also adding a buffer to avoid overwhelming the database. ctx, cancel := context.WithTimeout(ctx, jw.maxBatchTimeout) // nolint: revive // defer defer cancel() // Start a transaction to run the background migration tx, err := jw.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("failed to create database transaction: %w", err) } // nolint: revive // defer defer tx.Rollback() bbmStore := datastore.NewBackgroundMigrationStore(tx) // Grab distributed lock lockCtx, lockCancel := context.WithTimeout(ctx, jw.lockWaitTimeout) // nolint: revive // defer defer lockCancel() if err = jw.wh.GrabLock(lockCtx, bbmStore); err != nil { jw.logger.WithError(err).Error("failed to obtain lock, terminating batch run") return err } for i := 0; i < jw.maxJobPerBatch; i++ { jw.logger = jw.logger.WithFields(log.Fields{correlation.FieldName: correlation.SafeRandomID()}) // each job should take at most the lesser of the parent ctx timeout or jobTimeout`. jobCtx, jobCancel := context.WithTimeout(ctx, jw.jobTimeout) // nolint: revive // defer defer jobCancel() job, err := jw.wh.FindJob(jobCtx, bbmStore) if err != nil { jw.logger.WithError(err).Error("failed to find job, terminating batch run") return err } // nolint: revive // max-control-nesting if job != nil { l := jw.logger.WithFields(log.Fields{ jobIDKey: job.ID, jobBBMIDKey: job.BBMID, jobNameKey: job.JobName, jobStartIDKey: job.StartID, jobEndIDKey: job.EndID, jobBatchSizeKey: job.BatchSize, jobStatusKey: job.Status.String(), jobColumnKey: job.PaginationColumn, jobTableKey: job.PaginationTable, }) l.Info("found job, starting execution") err := jw.wh.ExecuteJob(jobCtx, bbmStore, job) if err != nil { l.WithError(err).Error("failed to execute job, terminating batch run") return err } l.Info("job completed") } else { // Commit the transaction if no more jobs are found if err := tx.Commit(); err != nil { return err } jw.logger.Info("no more jobs to run") return nil } jobCancel() } // Commit the transaction after processing the batch of jobs if err := tx.Commit(); err != nil { return err } lockCancel() cancel() // Randomized delay between `minDelayPerRun` and `maxDelayPerRun` // nolint: gosec sleep := time.Duration(rand.Int64N(int64(maxDelayPerRun-minDelayPerRun))) + minDelayPerRun jw.logger.WithFields(log.Fields{"duration_s": sleep.Seconds()}). Info("released lock, sleeping before next run") time.Sleep(sleep) } } // ResumeEligibleMigrations resumes all paused background migrations. func (jw *SyncWorker) ResumeEligibleMigrations(ctx context.Context) error { return datastore.NewBackgroundMigrationStore(jw.db).Resume(ctx) } // FindJob finds the next eligible job to be run. func (jw *SyncWorker) FindJob(ctx context.Context, bbmStore datastore.BackgroundMigrationStore) (*models.BackgroundMigrationJob, error) { var ( job *models.BackgroundMigrationJob bbm *models.BackgroundMigration err error ) for { // Find failed background migrations first bbm, job, err = findFailed(ctx, bbmStore) if err != nil { jw.logger.WithError(err).Error("failed to find failed background migrations job") return nil, fmt.Errorf("failed to find failed background migrations job: %w", err) } if bbm != nil { // Update status to running if migration was left in failed state but no longer has any failed migration jobs. if job == nil { jw.logger.Info("found failed migration without failed jobs, setting it back to running") bbm.ErrorCode = models.NullErrCode bbm.Status = models.BackgroundMigrationRunning // nolint: revive // max-control-nesting if err = bbmStore.UpdateStatus(ctx, bbm); err != nil { jw.logger.WithError(err).Error("failed to update status of background migration") return nil, fmt.Errorf("failed to update status of background migration: %w", err) } jw.logger.Info("updated migration status, continuing to look for other jobs") continue } enrichJobWithBBMAttributes(job, bbm) return job, err } bbm, job, err = findRunningOrActive(ctx, bbmStore) if err != nil { jw.logger.WithError(err).Error("failed to find running or active background migrations job") return nil, fmt.Errorf("failed to find running or active background migrations job: %w", err) } if bbm != nil { if job == nil { jw.logger.Info("found running/active migration without jobs, setting it to finished") bbm.ErrorCode = models.NullErrCode bbm.Status = models.BackgroundMigrationFinished // nolint: revive // max-control-nesting if err = bbmStore.UpdateStatus(ctx, bbm); err != nil { jw.logger.WithError(err).Error("failed to update status of background migration") return nil, fmt.Errorf("failed to update status of background migration: %w", err) } jw.lastRunCompletedBBMs++ jw.logger.Info("updated migration status, continuing to look for other jobs") continue } } enrichJobWithBBMAttributes(job, bbm) return job, err } } // findRunningOrActive finds the next background migration in the running or active state and the next eligible job to be run under the migration. func findRunningOrActive(ctx context.Context, bbmStore datastore.BackgroundMigrationStore) (*models.BackgroundMigration, *models.BackgroundMigrationJob, error) { var ( job *models.BackgroundMigrationJob bbm *models.BackgroundMigration ) // Find the next running or active background migration bbm, err := bbmStore.FindNext(ctx) if err != nil { return nil, nil, fmt.Errorf("failed to find active/running background migrations: %w", err) } if bbm == nil { // No more background migrations to process, exit loop return nil, nil, nil } // Prioritize failed jobs if any before considering new jobs job, err = bbmStore.FindJobWithStatus(ctx, bbm.ID, models.BackgroundMigrationFailed) if err != nil { return nil, nil, err } if job == nil { // If no failed jobs are found, look for new jobs job, err = findNewJob(ctx, bbmStore, bbm) if err != nil { return nil, nil, err } } return bbm, job, err } // findFailed finds the next background migration in the failed state and the next eligible failed job to be run under the migration. func findFailed(ctx context.Context, bbmStore datastore.BackgroundMigrationStore) (*models.BackgroundMigration, *models.BackgroundMigrationJob, error) { var ( job *models.BackgroundMigrationJob bbm *models.BackgroundMigration err error ) // Find failed background migrations bbm, err = bbmStore.FindNextByStatus(ctx, models.BackgroundMigrationFailed) if err != nil { return nil, nil, fmt.Errorf("failed to find failed background migrations: %w", err) } // If failing background migration found, find failed jobs and rerun them if bbm != nil { // Find failed jobs for background migration job, err = bbmStore.FindJobWithStatus(ctx, bbm.ID, models.BackgroundMigrationFailed) if err != nil { return bbm, nil, fmt.Errorf("failed to find failed background migrations job: %w", err) } } return bbm, job, err } // ExecuteJob executes a background migration's job unit of work. func (jw *SyncWorker) ExecuteJob(ctx context.Context, bbmStore datastore.BackgroundMigrationStore, job *models.BackgroundMigrationJob) error { // Find the job function from the registered work map and execute it. if work, found := jw.work[job.JobName]; found { for i := 0; i < jw.maxJobAttempt; i++ { jw.logger.WithFields(log.Fields{"sync_attempt": i + 1}).Info("executing job") err := work.Do(ctx, jw.db, job.PaginationTable, job.PaginationColumn, job.StartID, job.EndID, job.BatchSize) if err == nil { jw.logger.Info("job execution completed successfully, updating job status") job.Status = models.BackgroundMigrationFinished return bbmStore.UpdateJobStatus(ctx, job) } jw.logger.WithError(err).Error("failed executing job, retrying") } } else { jw.logger.WithFields(log.Fields{"job_name": job.JobName}).Error("job function not found in work map") return ErrWorkFunctionNotFound } jw.logger.Error("maximum number of job attempts reached") return ErrMaxJobAttemptsReached } // GrabLock attempts to grab the distributed lock used for coordination between all Background Migration processes. func (jw *SyncWorker) GrabLock(ctx context.Context, bbmStore datastore.BackgroundMigrationStore) (err error) { jw.logger.Info("attempting to acquire lock...") err = bbmStore.SyncLock(ctx) if err != nil { jw.logger.WithError(err).Error("failed to acquire lock") return err } jw.logger.Info("successfully obtained lock") return nil } // enrichJobWithBBMAttributes enriches the job with attributes from the background migration. func enrichJobWithBBMAttributes(job *models.BackgroundMigrationJob, bbm *models.BackgroundMigration) { if job != nil && bbm != nil { job.JobName = bbm.JobName job.PaginationTable = bbm.TargetTable job.PaginationColumn = bbm.TargetColumn job.BatchSize = bbm.BatchSize } } // FinishedMigrationCount returns the count of background migrations completed in the last run. func (jw *SyncWorker) FinishedMigrationCount() int { return jw.lastRunCompletedBBMs }