func()

in pkg/jobmgr/cached/job.go [1096:1214]


func (j *job) Update(
	ctx context.Context,
	jobInfo *pbjob.JobInfo,
	configAddOn *models.ConfigAddOn,
	spec *stateless.JobSpec,
	req UpdateRequest) error {
	var (
		jobTypeCopy     pbjob.JobType
		jobSummaryCopy  *pbjob.JobSummary
		updateModelCopy *models.UpdateModel
	)
	// notify listeners after dropping the lock
	defer func() {
		j.jobFactory.notifyJobSummaryChanged(
			j.ID(),
			jobTypeCopy,
			jobSummaryCopy,
			updateModelCopy,
		)
	}()

	j.Lock()
	defer j.Unlock()

	var (
		updatedConfig *pbjob.JobConfig
		err           error
	)

	if jobInfo.GetConfig() != nil {
		if configAddOn == nil {
			return fmt.Errorf(
				"ConfigAddOn cannot be nil when 'JobInfo.JobConfig' is not nil")
		}
		if req == UpdateCacheOnly {
			// overwrite the cache after validating that
			// version is either the same or increasing
			if j.config == nil || j.config.changeLog.GetVersion() <=
				jobInfo.GetConfig().GetChangeLog().GetVersion() {
				j.populateJobConfigCache(jobInfo.GetConfig())
			}
		} else {
			updatedConfig, err = j.getUpdatedJobConfigCache(
				ctx, jobInfo.GetConfig(), req)
			if err != nil {
				// invalidate cache if error not from validation failure
				if !yarpcerrors.IsInvalidArgument(err) {
					j.invalidateCache()
				}
				return err
			}
			if updatedConfig != nil {
				j.populateJobConfigCache(updatedConfig)
			}
		}
	}

	var updatedRuntime *pbjob.RuntimeInfo
	if jobInfo.GetRuntime() != nil {
		updatedRuntime, err = j.getUpdatedJobRuntimeCache(ctx, jobInfo.GetRuntime(), req)
		if err != nil {
			if err != _updateDeleteJobErr {
				j.invalidateCache()
			}
			return err
		}
		if updatedRuntime != nil {
			j.runtime = updatedRuntime
		}
	}

	if req == UpdateCacheAndDB {
		// Must update config first then runtime. Update config would create a
		// new config entry and update runtime would ask job to use the latest
		// config. If we update the runtime first successfully, and update
		// config with failure, job would try to access a non-existent config.
		if updatedConfig != nil {
			// Create a new versioned, entry for job_config, so this is not
			// an Update
			if err := j.jobFactory.jobConfigOps.Create(
				ctx,
				j.ID(),
				updatedConfig,
				configAddOn,
				spec,
				updatedConfig.GetChangeLog().GetVersion(),
			); err != nil {
				j.invalidateCache()
				return err
			}
		}

		if updatedRuntime != nil {
			err := j.jobFactory.jobRuntimeOps.Upsert(ctx, j.ID(), updatedRuntime)
			if err != nil {
				j.invalidateCache()
				return err
			}

			jobSummaryCopy, updateModelCopy = j.generateJobSummaryFromCache(j.runtime, j.runtime.GetUpdateID())
		}

		if updatedConfig != nil || updatedRuntime != nil {
			if err := j.jobFactory.jobIndexOps.Update(
				ctx,
				j.ID(),
				updatedConfig,
				updatedRuntime,
			); err != nil {
				j.invalidateCache()
				jobSummaryCopy = nil
				updateModelCopy = nil
				return err
			}
		}
	}
	jobTypeCopy = j.jobType
	return nil
}