in pkg/jobmgr/cached/job.go [2335:2498]
func (j *job) CreateWorkflow(
ctx context.Context,
workflowType models.WorkflowType,
updateConfig *pbupdate.UpdateConfig,
entityVersion *v1alphapeloton.EntityVersion,
options ...Option,
) (*peloton.UpdateID, *v1alphapeloton.EntityVersion, error) {
var (
jobTypeCopy pbjob.JobType
jobSummaryCopy *pbjob.JobSummary
updateModelCopy *models.UpdateModel
)
// notify listeners after dropping the lock
defer func() {
j.jobFactory.notifyJobSummaryChanged(
j.ID(),
jobTypeCopy,
jobSummaryCopy,
updateModelCopy,
)
}()
j.Lock()
defer j.Unlock()
if err := j.ValidateEntityVersion(ctx, entityVersion); err != nil {
return nil, nil, err
}
if util.IsPelotonJobStateTerminal(j.runtime.GetGoalState()) &&
!util.IsPelotonJobStateTerminal(j.runtime.GetState()) &&
updateConfig.GetStartTasks() {
return nil,
nil,
yarpcerrors.AbortedErrorf("job is being terminated, cannot update with start_pods set now")
}
var currentUpdate *models.UpdateModel
var err error
if currentUpdate, err = j.getCurrentUpdate(ctx); err != nil {
return nil, nil, err
}
opts := &workflowOpts{}
for _, option := range options {
option.apply(opts)
}
if j.isWorkflowNoop(
ctx,
opts.prevJobConfig,
opts.jobConfig,
updateConfig,
workflowType,
currentUpdate,
) {
if opts.opaqueData.GetData() != currentUpdate.GetOpaqueData().GetData() {
// update workflow version first, so the change to opaque data would cause
// an entity version change.
// This is needed as user behavior may depend on opaque data, peloton needs to
// make sure user takes the correct action based on update-to-date opaque data.
j.runtime.WorkflowVersion++
newRuntime := j.mergeRuntime(&pbjob.RuntimeInfo{WorkflowVersion: j.runtime.GetWorkflowVersion()})
if err := j.jobFactory.jobRuntimeOps.Upsert(ctx, j.id, newRuntime); err != nil {
j.invalidateCache()
return currentUpdate.GetUpdateID(),
nil,
errors.Wrap(err, "fail to update job runtime when create workflow")
}
// TODO: move this under update cache object
currentUpdate.OpaqueData = &peloton.OpaqueData{Data: opts.opaqueData.GetData()}
currentUpdate.UpdateTime = time.Now().Format(time.RFC3339Nano)
if err := j.
jobFactory.
updateStore.
ModifyUpdate(ctx, currentUpdate); err != nil {
return nil, nil, errors.Wrap(err, "fail to modify update opaque data")
}
}
// nothing changed, directly return
return currentUpdate.GetUpdateID(),
versionutil.GetJobEntityVersion(
j.runtime.GetConfigurationVersion(),
j.runtime.GetDesiredStateVersion(),
j.runtime.GetWorkflowVersion(),
),
nil
}
newConfig, err := j.compareAndSetConfig(
ctx,
opts.jobConfig,
opts.configAddOn,
opts.jobSpec,
)
if err != nil {
return nil, nil, err
}
updateID := &peloton.UpdateID{Value: uuid.New()}
newWorkflow := newUpdate(updateID, j.jobFactory)
if err := newWorkflow.Create(
ctx,
j.id,
newConfig,
opts.prevJobConfig,
opts.configAddOn,
opts.instanceAdded,
opts.instanceUpdated,
opts.instanceRemoved,
workflowType,
updateConfig,
opts.opaqueData,
); err != nil {
// Directly return without invalidating job config cache.
// When reading job config later, it would check if
// runtime.GetConfigurationVersion has the same version with cached config.
// If not, it would invalidate config cache and repopulate the cache with
// the correct version.
return nil, nil, err
}
err = j.updateJobRuntime(
ctx,
newConfig.GetChangeLog().GetVersion(),
j.runtime.GetWorkflowVersion()+1,
newWorkflow,
)
if err != nil {
return updateID, nil, err
}
// only add new workflow to job if runtime update succeeds.
// If err is not nil, it is unclear whether update id in job
// runtime is updated successfully. If the update id does get
// persisted in job runtime, workflow.Recover and AddWorkflow
// can ensure that job tracks the workflow when the workflow
// is processed.
j.workflows[updateID.GetValue()] = newWorkflow
jobTypeCopy = j.jobType
jobSummaryCopy, updateModelCopy = j.generateJobSummaryFromCache(j.runtime, updateID)
// entity version is changed due to change in config version
newEntityVersion := versionutil.GetJobEntityVersion(
j.runtime.GetConfigurationVersion(),
j.runtime.GetDesiredStateVersion(),
j.runtime.GetWorkflowVersion(),
)
log.WithField("workflow_id", updateID.GetValue()).
WithField("job_id", j.id.GetValue()).
WithField("instances_added", len(opts.instanceAdded)).
WithField("instances_updated", len(opts.instanceUpdated)).
WithField("instances_removed", len(opts.instanceRemoved)).
WithField("workflow_type", workflowType.String()).
Debug("workflow is created")
return updateID, newEntityVersion, err
}