func()

in pkg/jobmgr/cached/job.go [2759:2880]


func (j *job) RollbackWorkflow(ctx context.Context) error {
	var (
		jobTypeCopy     pbjob.JobType
		jobSummaryCopy  *pbjob.JobSummary
		updateModelCopy *models.UpdateModel
	)
	// notify listeners after dropping the lock
	defer func() {
		j.jobFactory.notifyJobSummaryChanged(
			j.ID(),
			jobTypeCopy,
			jobSummaryCopy,
			updateModelCopy,
		)
	}()

	j.Lock()
	defer j.Unlock()

	currentWorkflow, err := j.getCurrentWorkflow(ctx)
	if err != nil {
		return err
	}
	if currentWorkflow == nil {
		return yarpcerrors.NotFoundErrorf("no workflow found")
	}

	// make sure workflow cache is populated
	if err := currentWorkflow.Recover(ctx); err != nil {
		return err
	}

	if IsUpdateStateTerminal(currentWorkflow.GetState().State) {
		return nil
	}

	// make sure runtime cache is populated
	if err := j.populateRuntime(ctx); err != nil {
		return err
	}

	if currentWorkflow.GetState().State == pbupdate.State_ROLLING_BACKWARD {
		// config version in runtime is already set to the target
		// job version of rollback. This can happen due to error retry.
		if j.runtime.GetConfigurationVersion() ==
			currentWorkflow.GetGoalState().JobVersion {
			return nil
		}

		// just update job runtime config version
		err := j.updateJobRuntime(
			ctx,
			currentWorkflow.GetGoalState().JobVersion,
			j.runtime.GetWorkflowVersion(),
			currentWorkflow,
		)

		if err != nil {
			return err
		}

		jobTypeCopy = j.jobType
		jobSummaryCopy, updateModelCopy = j.generateJobSummaryFromCache(j.runtime, j.runtime.GetUpdateID())

		return nil
	}

	// get the old job config before the workflow is run
	prevObj, err := j.jobFactory.jobConfigOps.GetResult(
		ctx,
		j.ID(),
		currentWorkflow.GetState().JobVersion,
	)
	if err != nil {
		return errors.Wrap(err,
			"failed to get job config to copy for workflow rolling back")
	}

	// copy the old job config and get the config which
	// the workflow can "rollback" to
	configCopy, err := j.copyJobAndTaskConfig(
		ctx,
		prevObj.JobConfig,
		prevObj.ConfigAddOn,
		prevObj.JobSpec,
	)
	if err != nil {
		return errors.Wrap(err,
			"failed to copy job and task config for workflow rolling back")
	}

	// get the job config the workflow is targeted at before rollback
	currentConfig, _, err := j.jobFactory.jobConfigOps.Get(
		ctx,
		j.ID(),
		currentWorkflow.GetGoalState().JobVersion,
	)
	if err != nil {
		return errors.Wrap(err,
			"failed to get current job config for workflow rolling back")
	}

	if err := currentWorkflow.Rollback(ctx, currentConfig, configCopy); err != nil {
		return err
	}

	err = j.updateJobRuntime(
		ctx,
		configCopy.GetChangeLog().GetVersion(),
		j.runtime.GetWorkflowVersion(),
		currentWorkflow,
	)

	if err != nil {
		return err
	}

	jobTypeCopy = j.jobType
	jobSummaryCopy, updateModelCopy = j.generateJobSummaryFromCache(j.runtime, j.runtime.GetUpdateID())

	return nil
}