in pkg/jobmgr/cached/job.go [778:922]
func (j *job) RollingCreate(
ctx context.Context,
config *pbjob.JobConfig,
configAddOn *models.ConfigAddOn,
spec *stateless.JobSpec,
updateConfig *pbupdate.UpdateConfig,
opaqueData *peloton.OpaqueData,
) error {
var (
jobTypeCopy pbjob.JobType
jobSummaryCopy *pbjob.JobSummary
updateModelCopy *models.UpdateModel
)
// notify listeners after dropping the lock
defer func() {
j.jobFactory.notifyJobSummaryChanged(
j.ID(),
jobTypeCopy,
jobSummaryCopy,
updateModelCopy,
)
}()
j.Lock()
defer j.Unlock()
if config == nil {
return yarpcerrors.InvalidArgumentErrorf("missing config in jobInfo")
}
if updateConfig.GetRollbackOnFailure() == true {
return yarpcerrors.InvalidArgumentErrorf("job creation cannot rollback on failure")
}
// Add jobID to active jobs table before creating job runtime. This should
// happen every time a job is first created.
if err := j.jobFactory.activeJobsOps.Create(
ctx, j.ID()); err != nil {
j.invalidateCache()
return err
}
config = populateConfigChangeLog(config)
// dummy config is used as the starting config for update workflow
dummyConfig := proto.Clone(config).(*pbjob.JobConfig)
dummyConfig.InstanceCount = 0
dummyConfig.ChangeLog.Version = jobmgrcommon.DummyConfigVersion
dummyConfig.DefaultConfig = nil
dummyConfig.InstanceConfig = nil
instancesAdded := make([]uint32, config.InstanceCount)
for i := uint32(0); i < config.InstanceCount; i++ {
instancesAdded[i] = i
}
// create workflow which is going to initialize the job
updateID := &peloton.UpdateID{Value: uuid.New()}
// create job runtime and set state to UNINITIALIZED with updateID,
// so on error recovery, update config such as batch size can be
// recovered
if err := j.createJobRuntime(ctx, config, updateID); err != nil {
j.invalidateCache()
return err
}
// create job name to job id mapping.
// if the creation fails here, since job config is not created yet,
// the job will be cleaned up in goalstate engine JobRecover action.
if config.GetType() == pbjob.JobType_SERVICE {
if err := j.jobFactory.jobNameToIDOps.Create(
ctx,
config.GetName(),
j.ID(),
); err != nil {
j.invalidateCache()
return err
}
}
newWorkflow := newUpdate(updateID, j.jobFactory)
if err := newWorkflow.Create(
ctx,
j.id,
config,
dummyConfig,
configAddOn,
instancesAdded,
nil,
nil,
models.WorkflowType_UPDATE,
updateConfig,
opaqueData,
); err != nil {
j.invalidateCache()
return err
}
// create the dummy config in db, it is possible that the dummy config already
// exists in db when doing error retry. So ignore already exist error here
if err := j.createJobConfig(ctx, dummyConfig, configAddOn, nil); err != nil &&
!yarpcerrors.IsAlreadyExists(errors.Cause(err)) {
j.invalidateCache()
return err
}
// create the real config as the target config for update workflow.
// Once the config is persisted successfully in db, the job is considered
// as created successfully, and should be able to recover from
// rest of the error. Calling RollingCreate after this call succeeds again,
// would result in AlreadyExist error
if err := j.createJobConfig(ctx, config, configAddOn, spec); err != nil {
j.invalidateCache()
return err
}
jobTypeCopy = j.jobType
// both config and runtime are created, move the state to PENDING
j.runtime.State = pbjob.JobState_PENDING
if err := j.jobFactory.jobRuntimeOps.Upsert(
ctx,
j.id,
j.runtime); err != nil {
j.invalidateCache()
return err
}
if err := j.jobFactory.jobIndexOps.Create(
ctx,
j.id,
config,
j.runtime,
); err != nil {
j.invalidateCache()
return err
}
j.workflows[updateID.GetValue()] = newWorkflow
// create JobSummary and WorkflowStatus while we have the lock
jobSummaryCopy, updateModelCopy = j.generateJobSummaryFromCache(j.runtime, updateID)
return nil
}