in registry/bbm/bbm.go [311:395]
func (jw *Worker) FindJob(ctx context.Context, bbmStore datastore.BackgroundMigrationStore) (*models.BackgroundMigrationJob, error) {
// Find a Background Migration that needs to be run.
bbm, err := bbmStore.FindNext(ctx)
if err != nil {
return nil, err
}
if bbm == nil {
return nil, nil
}
l := jw.logger.WithFields(log.Fields{
bbmNameKey: bbm.Name,
bbmIDKey: bbm.ID,
bbmJobSignatureKey: bbm.JobName,
bbmStatusKey: bbm.Status.String(),
bbmBatchSizeKey: bbm.BatchSize,
bbmTargetColumnKey: bbm.TargetColumn,
bbm.TargetTable: bbm.TargetTable,
})
l.Info("a background migration was found that needs to be executed")
err = validateMigration(ctx, bbmStore, jw.Work, bbm)
if err != nil {
l.WithError(err).Error("background migration failed validation")
if errors.Is(err, datastore.ErrUnknownColumn) || errors.Is(err, datastore.ErrUnknownTable) || errors.Is(err, ErrWorkFunctionNotFound) {
// Mark migration as failed and surface the error to sentry if we don't know the column or the table referenced in the migration
var errCode models.BBMErrorCode
switch {
case errors.Is(err, datastore.ErrUnknownColumn):
errCode = models.InvalidColumnBBMErrCode
case errors.Is(err, datastore.ErrUnknownTable):
errCode = models.InvalidTableBBMErrCode
case errors.Is(err, ErrWorkFunctionNotFound):
errCode = models.InvalidJobSignatureBBMErrCode
}
errortracking.Capture(err, errortracking.WithContext(ctx))
bbm.Status = models.BackgroundMigrationFailed
bbm.ErrorCode = errCode
return nil, bbmStore.UpdateStatus(ctx, bbm)
}
return nil, err
}
var job *models.BackgroundMigrationJob
done, err := hasRunAllBBMJobsAtLeastOnce(ctx, bbmStore, bbm)
if err != nil {
return nil, err
}
// if we haven't run all jobs for the Background Migration at least once first find and exhaust all new jobs before considering failed/retryable jobs.
if !done {
return findNewJob(ctx, bbmStore, bbm)
}
job, err = findRetryableJobs(ctx, bbmStore, bbm)
if err != nil {
return nil, err
}
if job != nil {
l := l.WithFields(log.Fields{
jobIDKey: job.ID,
jobBBMIDKey: job.BBMID,
jobNameKey: job.JobName,
jobAttemptsKey: job.Attempts,
jobStartIDKey: job.StartID,
jobEndIDKey: job.EndID,
jobBatchSizeKey: job.BatchSize,
jobStatusKey: job.Status.String(),
jobColumnKey: job.PaginationColumn,
jobTableKey: job.PaginationTable,
})
// check that the selected job does not exceed the configured `MaxJobAttempt`
if job.Attempts >= jw.maxJobAttempt {
l.WithError(ErrMaxJobAttemptsReached).Error("marking background migration as failed due to job failure")
bbm.ErrorCode = models.JobExceedsMaxAttemptBBMErrCode
bbm.Status = models.BackgroundMigrationFailed
return nil, bbmStore.UpdateStatus(ctx, bbm)
}
}
return job, nil
}