registry/bbm/bbm.go (402 lines of code) (raw):

//go:generate mockgen -package mocks -destination mocks/bbm.go . Handler package bbm import ( "context" "errors" "fmt" "math/rand/v2" "time" "github.com/docker/distribution/registry/bbm/metrics" "github.com/docker/distribution/log" "github.com/docker/distribution/registry/datastore" "github.com/docker/distribution/registry/datastore/models" "gitlab.com/gitlab-org/labkit/correlation" "gitlab.com/gitlab-org/labkit/errortracking" ) const ( componentKey = "component" workerName = "registry.bbm.Worker" defaultMaxJobAttempt = 5 defaultJobInterval = 1 * time.Minute jobIntervalJitterSeconds = 5 // maxRunTimeout caps the duration of each background migration run to 3 mins. maxRunTimeout = 3 * time.Minute // Background Migration job log keys jobIDKey = "job_id" jobBBMIDKey = "job_bbm_id" jobNameKey = "job_name" jobAttemptsKey = "job_attempts" jobStartIDKey = "job_start_id" jobEndIDKey = "job_end_id" jobBatchSizeKey = "job_batch_size" jobStatusKey = "job_status" jobColumnKey = "job_pagination_column" jobTableKey = "job_pagination_table" // Background Migration log keys bbmIDKey = "bbm_id" bbmNameKey = "bbm_name" bbmBatchSizeKey = "bbm_batch_size" bbmStatusKey = "bbm_status" bbmJobSignatureKey = "bbm_job_signature_key" bbmTargetColumnKey = "bbm_target_column" bbmTargetTableKey = "bbm_target_table" ) var ( // ErrJobEndpointNotFound is returned when a job's endpoint can not be calculated. ErrJobEndpointNotFound = errors.New("job endpoint could not be calculated") // ErrMaxJobAttemptsReached is returned when the maximum attempt to try a job has elapsed. ErrMaxJobAttemptsReached = errors.New("maximum job attempt reached") // ErrWorkFunctionNotFound is returned when a referenced job has no corresponding work function. ErrWorkFunctionNotFound = errors.New("work function not found") ) type WorkFunc func(ctx context.Context, db datastore.Handler, paginationTable, paginationColumn string, paginationAfter, paginationBefore, limit int) error // Work represents the underlying functions that a Background Migration job is capable of executing. type Work struct { // Name must correspond to the `job_signature_name` in `batched_background_migrations` table. Name string // Do is the work function that is assigned to a job. Do WorkFunc } // AllWork is a list of all background migration work functions known to the registry. // When a registry developer wants to link a background migration in the database to a work function, they must make sure the corresponding work entry is added to the list here. // The `Work.name` must correspond to the value in `batched_background_migrations.job_signature_name` column for the specific the background migration, // otherwise the migration will fail to run with an `ErrWorkFunctionNotFound` when picked up. func AllWork() []Work { // nolint: revive // enforce-slice-style return []Work{ // {Name: "ExampleNameThatMatchesTheJobSignatureNameColumn", Do: ExampleDoFunction} } } // RegisterWork registers all known work functions to the Background Migration worker. func RegisterWork(work []Work, opts ...WorkerOption) (*Worker, error) { workMap := make(map[string]Work, 0) for _, val := range work { if _, found := workMap[val.Name]; found { return nil, fmt.Errorf("can not have work with the same name %s", val.Name) } workMap[val.Name] = val } return NewWorker(workMap, opts...), nil } // Worker is the Background Migration agent of execution. It listens for pending Background Migration jobs and tries to execute the corresponding work function. type Worker struct { Work map[string]Work logger log.Logger db datastore.Handler jobInterval time.Duration maxJobAttempt int wh Handler } // Handler defines the methods required for handling background migration jobs. It provides flexibility to use different implementations for job management. // This is currently leveraged in tests to facilitate mocking worker functions. type Handler interface { FindJob(context.Context, datastore.BackgroundMigrationStore) (*models.BackgroundMigrationJob, error) GrabLock(context.Context, datastore.BackgroundMigrationStore) error ExecuteJob(context.Context, datastore.BackgroundMigrationStore, *models.BackgroundMigrationJob) error } // WorkerOption provides functional options for NewWorker. type WorkerOption func(*Worker) // WithJobInterval sets the interval between job scans/runs. Defaults to 1 seconds. func WithJobInterval(d time.Duration) WorkerOption { return func(a *Worker) { a.jobInterval = d } } // WithMaxJobAttempt sets the maximum attempts to try to execute a job when an error occurs. func WithMaxJobAttempt(d int) WorkerOption { return func(jw *Worker) { jw.maxJobAttempt = d } } // WithLogger sets the logger. func WithLogger(l log.Logger) WorkerOption { return func(jw *Worker) { jw.logger = l } } // WithDB sets the DB. func WithDB(db datastore.Handler) WorkerOption { return func(jw *Worker) { jw.db = db } } // WithHandler sets the worker handle. func WithHandler(wh Handler) WorkerOption { return func(jw *Worker) { jw.wh = wh } } func (jw *Worker) applyDefaults() { if jw.logger == nil { jw.logger = log.GetLogger() } if jw.jobInterval == 0 { jw.jobInterval = defaultJobInterval } if jw.maxJobAttempt == 0 { jw.maxJobAttempt = defaultMaxJobAttempt } if jw.wh == nil { jw.wh = jw } } // NewWorker creates a new Worker. func NewWorker(workMap map[string]Work, opts ...WorkerOption) *Worker { jw := &Worker{Work: workMap} jw.applyDefaults() for _, opt := range opts { opt(jw) } jw.logger = jw.logger.WithFields(log.Fields{componentKey: workerName}) return jw } // ListenForBackgroundMigration allows a Worker to inspect the Background Migration datastore for pending jobs that need to be executed and if applicable execute the work function of the pending job. // The inspection of the Background Migration datastore for potential work is carried out at a period of `jobInterval` set in the configuration. // Once a job is found the worker attempts to obtain the distributed Background Migration lock and execute the job function to completion. // However, if the distributed lock is held by another worker in a separate process it returns an `ErrlockInUse` and waits another `jobInterval` duration to try again. func (jw *Worker) ListenForBackgroundMigration(ctx context.Context, doneChan <-chan struct{}) (chan struct{}, error) { // gracefullFinish is used to signal to an upstream processes that a worker has completed any in-flight jobs and the upstream can terminate if needed. gracefullFinish := make(chan struct{}) // Create a period that this worker searches and executes work on (use a random jitter of jobIntervalJitterSeconds for obscurity). // nolint: gosec ticker := time.NewTicker(jw.jobInterval + time.Duration(rand.IntN(jobIntervalJitterSeconds))*time.Second) go func() { for { jw.logger.Info("waiting for next cycle...") select { // The upstream process is terminating, this worker should exit. case <-doneChan: // cleanup jw.logger.Info("received shutdown signal: shutting down...") close(gracefullFinish) return // A period has elapsed, time to try to find and execute any available jobs. case <-ticker.C: jw.logger.Info("starting worker run...") report := metrics.WorkerRun() jw.run(ctx) report() } } }() return gracefullFinish, nil } // run is responsible for orchestrating and executing Background Migrations when found. // it does this by attempting to obtain the Background Migration distributed lock, // before proceeding to find and execute any applicable Background Migration jobs. func (jw *Worker) run(ctx context.Context) { // inject correlation id to logs jw.logger = jw.logger.WithFields(log.Fields{correlation.FieldName: correlation.ExtractFromContextOrGenerate(ctx)}) // Cancel the job worker context when the `maxRunTimeout` duration elapses ctx, cancel := context.WithTimeout(ctx, maxRunTimeout) defer cancel() // start a transaction to run the background migration process tx, err := jw.db.BeginTx(ctx, nil) if err != nil { jw.logger.WithError(err).Error("failed to create database transaction") errortracking.Capture(err, errortracking.WithContext(ctx), errortracking.WithStackTrace()) return } defer tx.Rollback() bbmStore := datastore.NewBackgroundMigrationStore(tx) // Grab distributed lock jw.logger.Info("obtaining lock...") if err = jw.wh.GrabLock(ctx, bbmStore); err != nil { if !errors.Is(err, datastore.ErrBackgroundMigrationLockInUse) { errortracking.Capture(err, errortracking.WithContext(ctx), errortracking.WithStackTrace()) } jw.logger.WithError(err).Info("failed to obtain lock") return } // Search for available jobs jw.logger.Info("searching for job...") job, err := jw.wh.FindJob(ctx, bbmStore) if err != nil { jw.logger.WithError(err).Error("failed to find job") errortracking.Capture(err, errortracking.WithContext(ctx), errortracking.WithStackTrace()) return } if job == nil { jw.logger.Info("no jobs to run...") if err = tx.Commit(); err != nil { jw.logger.WithError(err).Error("failed to commit database transaction") errortracking.Capture(err, errortracking.WithContext(ctx), errortracking.WithStackTrace()) } return } l := jw.logger.WithFields(log.Fields{ jobIDKey: job.ID, jobBBMIDKey: job.BBMID, jobNameKey: job.JobName, jobAttemptsKey: job.Attempts, jobStartIDKey: job.StartID, jobEndIDKey: job.EndID, jobBatchSizeKey: job.BatchSize, jobStatusKey: job.Status.String(), jobColumnKey: job.PaginationColumn, jobTableKey: job.PaginationTable, }) // A job was found, lets execute it jw.logger.Info("job found, executing") defer metrics.Job(job.BatchSize, job.JobName, fmt.Sprint(job.BBMID))() err = jw.wh.ExecuteJob(ctx, bbmStore, job) if err != nil { l.WithError(err).Error("failed to execute job") errortracking.Capture(err, errortracking.WithContext(ctx), errortracking.WithStackTrace()) return } if err = tx.Commit(); err != nil { jw.logger.WithError(err).Error("failed to commit database transaction") errortracking.Capture(err, errortracking.WithContext(ctx), errortracking.WithStackTrace()) return } metrics.MigrationRecord(job.BatchSize, job.JobName, fmt.Sprint(job.BBMID)) l.Info("finished background migration job run") } // GrabLock attempts to grab the distributed lock used for co-ordination between all Background Migration processes. func (jw *Worker) GrabLock(ctx context.Context, bbmStore datastore.BackgroundMigrationStore) (err error) { // Acquire a lock so no other Background Migration process can run. err = bbmStore.Lock(ctx) if err != nil { return err } jw.logger.Info("obtained lock") return nil } // FindJob checks for any Background Migration job that needs to be executed. // If a job needs to be executed it either fetches the job or creates the job // associated with the chosen Background Migration. func (jw *Worker) FindJob(ctx context.Context, bbmStore datastore.BackgroundMigrationStore) (*models.BackgroundMigrationJob, error) { // Find a Background Migration that needs to be run. bbm, err := bbmStore.FindNext(ctx) if err != nil { return nil, err } if bbm == nil { return nil, nil } l := jw.logger.WithFields(log.Fields{ bbmNameKey: bbm.Name, bbmIDKey: bbm.ID, bbmJobSignatureKey: bbm.JobName, bbmStatusKey: bbm.Status.String(), bbmBatchSizeKey: bbm.BatchSize, bbmTargetColumnKey: bbm.TargetColumn, bbm.TargetTable: bbm.TargetTable, }) l.Info("a background migration was found that needs to be executed") err = validateMigration(ctx, bbmStore, jw.Work, bbm) if err != nil { l.WithError(err).Error("background migration failed validation") if errors.Is(err, datastore.ErrUnknownColumn) || errors.Is(err, datastore.ErrUnknownTable) || errors.Is(err, ErrWorkFunctionNotFound) { // Mark migration as failed and surface the error to sentry if we don't know the column or the table referenced in the migration var errCode models.BBMErrorCode switch { case errors.Is(err, datastore.ErrUnknownColumn): errCode = models.InvalidColumnBBMErrCode case errors.Is(err, datastore.ErrUnknownTable): errCode = models.InvalidTableBBMErrCode case errors.Is(err, ErrWorkFunctionNotFound): errCode = models.InvalidJobSignatureBBMErrCode } errortracking.Capture(err, errortracking.WithContext(ctx)) bbm.Status = models.BackgroundMigrationFailed bbm.ErrorCode = errCode return nil, bbmStore.UpdateStatus(ctx, bbm) } return nil, err } var job *models.BackgroundMigrationJob done, err := hasRunAllBBMJobsAtLeastOnce(ctx, bbmStore, bbm) if err != nil { return nil, err } // if we haven't run all jobs for the Background Migration at least once first find and exhaust all new jobs before considering failed/retryable jobs. if !done { return findNewJob(ctx, bbmStore, bbm) } job, err = findRetryableJobs(ctx, bbmStore, bbm) if err != nil { return nil, err } if job != nil { l := l.WithFields(log.Fields{ jobIDKey: job.ID, jobBBMIDKey: job.BBMID, jobNameKey: job.JobName, jobAttemptsKey: job.Attempts, jobStartIDKey: job.StartID, jobEndIDKey: job.EndID, jobBatchSizeKey: job.BatchSize, jobStatusKey: job.Status.String(), jobColumnKey: job.PaginationColumn, jobTableKey: job.PaginationTable, }) // check that the selected job does not exceed the configured `MaxJobAttempt` if job.Attempts >= jw.maxJobAttempt { l.WithError(ErrMaxJobAttemptsReached).Error("marking background migration as failed due to job failure") bbm.ErrorCode = models.JobExceedsMaxAttemptBBMErrCode bbm.Status = models.BackgroundMigrationFailed return nil, bbmStore.UpdateStatus(ctx, bbm) } } return job, nil } // ExecuteJob attempts to execute the function associated with a Background Migration job from the job's start-end range. func (jw *Worker) ExecuteJob(ctx context.Context, bbmStore datastore.BackgroundMigrationStore, job *models.BackgroundMigrationJob) error { // update the job attempts err := bbmStore.IncrementJobAttempts(ctx, job.ID) if err != nil { return err } // find the job function from the registered work map and execute it. if work, found := jw.Work[job.JobName]; found { defer metrics.InstrumentQuery(job.JobName, fmt.Sprint(job.BBMID))() err := work.Do(ctx, jw.db, job.PaginationTable, job.PaginationColumn, job.StartID, job.EndID, job.BatchSize) if err != nil { jw.logger.WithError(err).Error("failed executing job") job.Status = models.BackgroundMigrationFailed job.ErrorCode = models.UnknownBBMErrorCode return bbmStore.UpdateJobStatus(ctx, job) } job.Status = models.BackgroundMigrationFinished return bbmStore.UpdateJobStatus(ctx, job) } job.Status = models.BackgroundMigrationFailed job.ErrorCode = models.InvalidJobSignatureBBMErrCode err = bbmStore.UpdateJobStatus(ctx, job) if err != nil { return err } return ErrWorkFunctionNotFound } // AllMigrations returns all background migrations. func (jw *Worker) AllMigrations(ctx context.Context) (models.BackgroundMigrations, error) { return datastore.NewBackgroundMigrationStore(jw.db).FindAll(ctx) } // PauseEligibleMigrations pauses all running or active background migrations. func (jw *Worker) PauseEligibleMigrations(ctx context.Context) error { return datastore.NewBackgroundMigrationStore(jw.db).Pause(ctx) } // ResumeEligibleMigrations resumes all paused background migrations. func (jw *Worker) ResumeEligibleMigrations(ctx context.Context) error { return datastore.NewBackgroundMigrationStore(jw.db).Resume(ctx) } // findRetryableJobs looks for jobs that failed prior in the scope of a specific Background Migration. // if no failed jobs are found in the Background Migration it sets the status of the Background Migration to finished. func findRetryableJobs(ctx context.Context, bbmStore datastore.BackgroundMigrationStore, bbm *models.BackgroundMigration) (*models.BackgroundMigrationJob, error) { job, err := bbmStore.FindJobWithStatus(ctx, bbm.ID, models.BackgroundMigrationFailed) if err != nil { return nil, err } // if there are no jobs that failed update the migration to finished state if job == nil { bbm.Status = models.BackgroundMigrationFinished return nil, bbmStore.UpdateStatus(ctx, bbm) } // Otherwise, decorate any found job with the parent (Background Migration) attributes. job.JobName = bbm.JobName job.PaginationTable = bbm.TargetTable job.PaginationColumn = bbm.TargetColumn job.BatchSize = bbm.BatchSize return job, nil } // findNewJob creates the next job in the batch sequence to be run for a Background Migration. func findNewJob(ctx context.Context, bbmStore datastore.BackgroundMigrationStore, bbm *models.BackgroundMigration) (*models.BackgroundMigrationJob, error) { var ( start int last = bbm.EndID ) // find the last job that was created for the Background Migration. lastCreatedJob, err := bbmStore.FindLastJob(ctx, bbm) if err != nil { return nil, err } // if the Background Migration does not have any job, this implies it was never been run/started, so start it! if lastCreatedJob == nil { start = bbm.StartID bbm.Status = models.BackgroundMigrationRunning err = bbmStore.UpdateStatus(ctx, bbm) if err != nil { return nil, err } } else { // Update status to running if migration was left in active state. // This can happen after a pause command (sets migrations to paused) // followed by a resume command (sets paused migrations to active). if bbm.Status == models.BackgroundMigrationActive { bbm.Status = models.BackgroundMigrationRunning err = bbmStore.UpdateStatus(ctx, bbm) if err != nil { return nil, err } } // otherwise find the starting point for the next job that should be created for the Background Migration. start = lastCreatedJob.EndID + 1 // the start point of the job to be created must not be greater than the Background Migration end bound. if start > last { return nil, nil } } // Based on the Background Migration batch size and the start point of the job, find the job's end point. // TODO: we could off-load some of this logic to the store layer where we can potentially craft a query that will give us both start and end job IDs. end, err := bbmStore.FindJobEndFromJobStart(ctx, bbm.TargetTable, bbm.TargetColumn, start, last, bbm.BatchSize) if err != nil { return nil, err } // create the job representation and decorate the job with some of the parent (Background Migration) attributes job := &models.BackgroundMigrationJob{ BBMID: bbm.ID, StartID: start, EndID: end, BatchSize: bbm.BatchSize, JobName: bbm.JobName, PaginationColumn: bbm.TargetColumn, PaginationTable: bbm.TargetTable, } err = bbmStore.CreateNewJob(ctx, job) if err != nil { return nil, err } return job, nil } // hasRunAllBBMJobsAtLeastOnce checks if a Background Migration has run all its jobs at least once. func hasRunAllBBMJobsAtLeastOnce(ctx context.Context, bbmStore datastore.BackgroundMigrationStore, bbm *models.BackgroundMigration) (bool, error) { // Check if any jobs for the selected Background Migration exist with the end bound of the Background Migration, // if it does, it signifies we've run all jobs of the selected Background Migration at least once. finalJob, err := bbmStore.FindJobWithEndID(ctx, bbm.ID, bbm.EndID) if finalJob != nil { return true, err } return false, err } func validateMigration(ctx context.Context, bbmStore datastore.BackgroundMigrationStore, workFuncs map[string]Work, bbm *models.BackgroundMigration) error { if _, ok := workFuncs[bbm.JobName]; !ok { return ErrWorkFunctionNotFound } if err := bbmStore.ValidateMigrationTableAndColumn(ctx, bbm.TargetTable, bbm.TargetColumn); err != nil { return fmt.Errorf("validating migration: %w", err) } return nil }