func()

in pkg/jobmgr/cached/job.go [2335:2498]


func (j *job) CreateWorkflow(
	ctx context.Context,
	workflowType models.WorkflowType,
	updateConfig *pbupdate.UpdateConfig,
	entityVersion *v1alphapeloton.EntityVersion,
	options ...Option,
) (*peloton.UpdateID, *v1alphapeloton.EntityVersion, error) {
	var (
		jobTypeCopy     pbjob.JobType
		jobSummaryCopy  *pbjob.JobSummary
		updateModelCopy *models.UpdateModel
	)
	// notify listeners after dropping the lock
	defer func() {
		j.jobFactory.notifyJobSummaryChanged(
			j.ID(),
			jobTypeCopy,
			jobSummaryCopy,
			updateModelCopy,
		)
	}()

	j.Lock()
	defer j.Unlock()

	if err := j.ValidateEntityVersion(ctx, entityVersion); err != nil {
		return nil, nil, err
	}

	if util.IsPelotonJobStateTerminal(j.runtime.GetGoalState()) &&
		!util.IsPelotonJobStateTerminal(j.runtime.GetState()) &&
		updateConfig.GetStartTasks() {
		return nil,
			nil,
			yarpcerrors.AbortedErrorf("job is being terminated, cannot update with start_pods set now")
	}

	var currentUpdate *models.UpdateModel
	var err error
	if currentUpdate, err = j.getCurrentUpdate(ctx); err != nil {
		return nil, nil, err
	}

	opts := &workflowOpts{}
	for _, option := range options {
		option.apply(opts)
	}

	if j.isWorkflowNoop(
		ctx,
		opts.prevJobConfig,
		opts.jobConfig,
		updateConfig,
		workflowType,
		currentUpdate,
	) {
		if opts.opaqueData.GetData() != currentUpdate.GetOpaqueData().GetData() {
			// update workflow version first, so the change to opaque data would cause
			// an entity version change.
			// This is needed as user behavior may depend on opaque data, peloton needs to
			// make sure user takes the correct action based on update-to-date opaque data.
			j.runtime.WorkflowVersion++
			newRuntime := j.mergeRuntime(&pbjob.RuntimeInfo{WorkflowVersion: j.runtime.GetWorkflowVersion()})
			if err := j.jobFactory.jobRuntimeOps.Upsert(ctx, j.id, newRuntime); err != nil {
				j.invalidateCache()
				return currentUpdate.GetUpdateID(),
					nil,
					errors.Wrap(err, "fail to update job runtime when create workflow")
			}

			// TODO: move this under update cache object
			currentUpdate.OpaqueData = &peloton.OpaqueData{Data: opts.opaqueData.GetData()}
			currentUpdate.UpdateTime = time.Now().Format(time.RFC3339Nano)
			if err := j.
				jobFactory.
				updateStore.
				ModifyUpdate(ctx, currentUpdate); err != nil {
				return nil, nil, errors.Wrap(err, "fail to modify update opaque data")
			}
		}

		// nothing changed, directly return
		return currentUpdate.GetUpdateID(),
			versionutil.GetJobEntityVersion(
				j.runtime.GetConfigurationVersion(),
				j.runtime.GetDesiredStateVersion(),
				j.runtime.GetWorkflowVersion(),
			),
			nil
	}

	newConfig, err := j.compareAndSetConfig(
		ctx,
		opts.jobConfig,
		opts.configAddOn,
		opts.jobSpec,
	)
	if err != nil {
		return nil, nil, err
	}

	updateID := &peloton.UpdateID{Value: uuid.New()}
	newWorkflow := newUpdate(updateID, j.jobFactory)

	if err := newWorkflow.Create(
		ctx,
		j.id,
		newConfig,
		opts.prevJobConfig,
		opts.configAddOn,
		opts.instanceAdded,
		opts.instanceUpdated,
		opts.instanceRemoved,
		workflowType,
		updateConfig,
		opts.opaqueData,
	); err != nil {
		// Directly return without invalidating job config cache.
		// When reading job config later, it would check if
		// runtime.GetConfigurationVersion has the same version with cached config.
		// If not, it would invalidate config cache and repopulate the cache with
		// the correct version.
		return nil, nil, err
	}

	err = j.updateJobRuntime(
		ctx,
		newConfig.GetChangeLog().GetVersion(),
		j.runtime.GetWorkflowVersion()+1,
		newWorkflow,
	)

	if err != nil {
		return updateID, nil, err
	}

	// only add new workflow to job if runtime update succeeds.
	// If err is not nil, it is unclear whether update id in job
	// runtime is updated successfully. If the update id does get
	// persisted in job runtime, workflow.Recover and AddWorkflow
	// can ensure that job tracks the workflow when the workflow
	// is processed.
	j.workflows[updateID.GetValue()] = newWorkflow

	jobTypeCopy = j.jobType
	jobSummaryCopy, updateModelCopy = j.generateJobSummaryFromCache(j.runtime, updateID)

	// entity version is changed due to change in config version
	newEntityVersion := versionutil.GetJobEntityVersion(
		j.runtime.GetConfigurationVersion(),
		j.runtime.GetDesiredStateVersion(),
		j.runtime.GetWorkflowVersion(),
	)

	log.WithField("workflow_id", updateID.GetValue()).
		WithField("job_id", j.id.GetValue()).
		WithField("instances_added", len(opts.instanceAdded)).
		WithField("instances_updated", len(opts.instanceUpdated)).
		WithField("instances_removed", len(opts.instanceRemoved)).
		WithField("workflow_type", workflowType.String()).
		Debug("workflow is created")

	return updateID, newEntityVersion, err
}