in pkg/jobmgr/cached/job.go [1096:1214]
func (j *job) Update(
ctx context.Context,
jobInfo *pbjob.JobInfo,
configAddOn *models.ConfigAddOn,
spec *stateless.JobSpec,
req UpdateRequest) error {
var (
jobTypeCopy pbjob.JobType
jobSummaryCopy *pbjob.JobSummary
updateModelCopy *models.UpdateModel
)
// notify listeners after dropping the lock
defer func() {
j.jobFactory.notifyJobSummaryChanged(
j.ID(),
jobTypeCopy,
jobSummaryCopy,
updateModelCopy,
)
}()
j.Lock()
defer j.Unlock()
var (
updatedConfig *pbjob.JobConfig
err error
)
if jobInfo.GetConfig() != nil {
if configAddOn == nil {
return fmt.Errorf(
"ConfigAddOn cannot be nil when 'JobInfo.JobConfig' is not nil")
}
if req == UpdateCacheOnly {
// overwrite the cache after validating that
// version is either the same or increasing
if j.config == nil || j.config.changeLog.GetVersion() <=
jobInfo.GetConfig().GetChangeLog().GetVersion() {
j.populateJobConfigCache(jobInfo.GetConfig())
}
} else {
updatedConfig, err = j.getUpdatedJobConfigCache(
ctx, jobInfo.GetConfig(), req)
if err != nil {
// invalidate cache if error not from validation failure
if !yarpcerrors.IsInvalidArgument(err) {
j.invalidateCache()
}
return err
}
if updatedConfig != nil {
j.populateJobConfigCache(updatedConfig)
}
}
}
var updatedRuntime *pbjob.RuntimeInfo
if jobInfo.GetRuntime() != nil {
updatedRuntime, err = j.getUpdatedJobRuntimeCache(ctx, jobInfo.GetRuntime(), req)
if err != nil {
if err != _updateDeleteJobErr {
j.invalidateCache()
}
return err
}
if updatedRuntime != nil {
j.runtime = updatedRuntime
}
}
if req == UpdateCacheAndDB {
// Must update config first then runtime. Update config would create a
// new config entry and update runtime would ask job to use the latest
// config. If we update the runtime first successfully, and update
// config with failure, job would try to access a non-existent config.
if updatedConfig != nil {
// Create a new versioned, entry for job_config, so this is not
// an Update
if err := j.jobFactory.jobConfigOps.Create(
ctx,
j.ID(),
updatedConfig,
configAddOn,
spec,
updatedConfig.GetChangeLog().GetVersion(),
); err != nil {
j.invalidateCache()
return err
}
}
if updatedRuntime != nil {
err := j.jobFactory.jobRuntimeOps.Upsert(ctx, j.ID(), updatedRuntime)
if err != nil {
j.invalidateCache()
return err
}
jobSummaryCopy, updateModelCopy = j.generateJobSummaryFromCache(j.runtime, j.runtime.GetUpdateID())
}
if updatedConfig != nil || updatedRuntime != nil {
if err := j.jobFactory.jobIndexOps.Update(
ctx,
j.ID(),
updatedConfig,
updatedRuntime,
); err != nil {
j.invalidateCache()
jobSummaryCopy = nil
updateModelCopy = nil
return err
}
}
}
jobTypeCopy = j.jobType
return nil
}