in pkg/jobmgr/cached/job.go [2759:2880]
func (j *job) RollbackWorkflow(ctx context.Context) error {
var (
jobTypeCopy pbjob.JobType
jobSummaryCopy *pbjob.JobSummary
updateModelCopy *models.UpdateModel
)
// notify listeners after dropping the lock
defer func() {
j.jobFactory.notifyJobSummaryChanged(
j.ID(),
jobTypeCopy,
jobSummaryCopy,
updateModelCopy,
)
}()
j.Lock()
defer j.Unlock()
currentWorkflow, err := j.getCurrentWorkflow(ctx)
if err != nil {
return err
}
if currentWorkflow == nil {
return yarpcerrors.NotFoundErrorf("no workflow found")
}
// make sure workflow cache is populated
if err := currentWorkflow.Recover(ctx); err != nil {
return err
}
if IsUpdateStateTerminal(currentWorkflow.GetState().State) {
return nil
}
// make sure runtime cache is populated
if err := j.populateRuntime(ctx); err != nil {
return err
}
if currentWorkflow.GetState().State == pbupdate.State_ROLLING_BACKWARD {
// config version in runtime is already set to the target
// job version of rollback. This can happen due to error retry.
if j.runtime.GetConfigurationVersion() ==
currentWorkflow.GetGoalState().JobVersion {
return nil
}
// just update job runtime config version
err := j.updateJobRuntime(
ctx,
currentWorkflow.GetGoalState().JobVersion,
j.runtime.GetWorkflowVersion(),
currentWorkflow,
)
if err != nil {
return err
}
jobTypeCopy = j.jobType
jobSummaryCopy, updateModelCopy = j.generateJobSummaryFromCache(j.runtime, j.runtime.GetUpdateID())
return nil
}
// get the old job config before the workflow is run
prevObj, err := j.jobFactory.jobConfigOps.GetResult(
ctx,
j.ID(),
currentWorkflow.GetState().JobVersion,
)
if err != nil {
return errors.Wrap(err,
"failed to get job config to copy for workflow rolling back")
}
// copy the old job config and get the config which
// the workflow can "rollback" to
configCopy, err := j.copyJobAndTaskConfig(
ctx,
prevObj.JobConfig,
prevObj.ConfigAddOn,
prevObj.JobSpec,
)
if err != nil {
return errors.Wrap(err,
"failed to copy job and task config for workflow rolling back")
}
// get the job config the workflow is targeted at before rollback
currentConfig, _, err := j.jobFactory.jobConfigOps.Get(
ctx,
j.ID(),
currentWorkflow.GetGoalState().JobVersion,
)
if err != nil {
return errors.Wrap(err,
"failed to get current job config for workflow rolling back")
}
if err := currentWorkflow.Rollback(ctx, currentConfig, configCopy); err != nil {
return err
}
err = j.updateJobRuntime(
ctx,
configCopy.GetChangeLog().GetVersion(),
j.runtime.GetWorkflowVersion(),
currentWorkflow,
)
if err != nil {
return err
}
jobTypeCopy = j.jobType
jobSummaryCopy, updateModelCopy = j.generateJobSummaryFromCache(j.runtime, j.runtime.GetUpdateID())
return nil
}