func()

in registry/bbm/bbm.go [311:395]


func (jw *Worker) FindJob(ctx context.Context, bbmStore datastore.BackgroundMigrationStore) (*models.BackgroundMigrationJob, error) {
	// Find a Background Migration that needs to be run.
	bbm, err := bbmStore.FindNext(ctx)
	if err != nil {
		return nil, err
	}
	if bbm == nil {
		return nil, nil
	}

	l := jw.logger.WithFields(log.Fields{
		bbmNameKey:         bbm.Name,
		bbmIDKey:           bbm.ID,
		bbmJobSignatureKey: bbm.JobName,
		bbmStatusKey:       bbm.Status.String(),
		bbmBatchSizeKey:    bbm.BatchSize,
		bbmTargetColumnKey: bbm.TargetColumn,
		bbm.TargetTable:    bbm.TargetTable,
	})

	l.Info("a background migration was found that needs to be executed")

	err = validateMigration(ctx, bbmStore, jw.Work, bbm)
	if err != nil {
		l.WithError(err).Error("background migration failed validation")

		if errors.Is(err, datastore.ErrUnknownColumn) || errors.Is(err, datastore.ErrUnknownTable) || errors.Is(err, ErrWorkFunctionNotFound) {
			// Mark migration as failed and surface the error to sentry if we don't know the column or the table referenced in the migration
			var errCode models.BBMErrorCode
			switch {
			case errors.Is(err, datastore.ErrUnknownColumn):
				errCode = models.InvalidColumnBBMErrCode
			case errors.Is(err, datastore.ErrUnknownTable):
				errCode = models.InvalidTableBBMErrCode
			case errors.Is(err, ErrWorkFunctionNotFound):
				errCode = models.InvalidJobSignatureBBMErrCode
			}
			errortracking.Capture(err, errortracking.WithContext(ctx))
			bbm.Status = models.BackgroundMigrationFailed
			bbm.ErrorCode = errCode
			return nil, bbmStore.UpdateStatus(ctx, bbm)
		}
		return nil, err
	}

	var job *models.BackgroundMigrationJob

	done, err := hasRunAllBBMJobsAtLeastOnce(ctx, bbmStore, bbm)
	if err != nil {
		return nil, err
	}

	// if we haven't run all jobs for the Background Migration at least once first find and exhaust all new jobs before considering failed/retryable jobs.
	if !done {
		return findNewJob(ctx, bbmStore, bbm)
	}

	job, err = findRetryableJobs(ctx, bbmStore, bbm)
	if err != nil {
		return nil, err
	}
	if job != nil {
		l := l.WithFields(log.Fields{
			jobIDKey:        job.ID,
			jobBBMIDKey:     job.BBMID,
			jobNameKey:      job.JobName,
			jobAttemptsKey:  job.Attempts,
			jobStartIDKey:   job.StartID,
			jobEndIDKey:     job.EndID,
			jobBatchSizeKey: job.BatchSize,
			jobStatusKey:    job.Status.String(),
			jobColumnKey:    job.PaginationColumn,
			jobTableKey:     job.PaginationTable,
		})

		// check that the selected job does not exceed the configured `MaxJobAttempt`
		if job.Attempts >= jw.maxJobAttempt {
			l.WithError(ErrMaxJobAttemptsReached).Error("marking background migration as failed due to job failure")
			bbm.ErrorCode = models.JobExceedsMaxAttemptBBMErrCode
			bbm.Status = models.BackgroundMigrationFailed
			return nil, bbmStore.UpdateStatus(ctx, bbm)
		}
	}
	return job, nil
}