func()

in pkg/jobmgr/jobsvc/handler.go [255:413]


func (h *serviceHandler) Update(
	ctx context.Context,
	req *job.UpdateRequest) (resp *job.UpdateResponse, err error) {
	defer func() {
		jobID := req.GetId().GetValue()
		headers := yarpcutil.GetHeaders(ctx)
		configVersion := req.GetConfig().GetChangeLog().GetVersion()

		if err != nil || resp.GetError() != nil {
			entry := log.WithField("job_id", jobID).
				WithField("headers", headers).
				WithField("config_version", configVersion)

			if err != nil {
				entry = entry.WithError(err)
			}

			if resp.GetError() != nil {
				entry = entry.WithField("update_error", resp.GetError().String())
			}

			entry.Warn("JobManager.Update failed")
			return
		}

		log.WithField("job_id", jobID).
			WithField("response", resp).
			WithField("config_version", configVersion).
			WithField("headers", headers).
			Info("JobManager.Update succeeded")
	}()

	h.metrics.JobAPIUpdate.Inc(1)

	if !h.candidate.IsLeader() {
		h.metrics.JobUpdateFail.Inc(1)
		return nil, yarpcerrors.UnavailableErrorf(
			"Job Update API not suppported on non-leader")
	}

	jobID := req.GetId()
	cachedJob := h.jobFactory.AddJob(jobID)
	jobRuntime, err := cachedJob.GetRuntime(ctx)
	if err != nil {
		log.WithError(err).
			WithField("job_id", jobID.GetValue()).
			Error("Failed to get runtime")
		h.metrics.JobUpdateFail.Inc(1)
		return nil, err
	}
	if util.IsPelotonJobStateTerminal(jobRuntime.State) {
		msg := fmt.Sprintf("Job is in a terminal state:%s", jobRuntime.State)
		h.metrics.JobUpdateFail.Inc(1)
		return nil, yarpcerrors.InvalidArgumentErrorf(msg)
	}

	newConfig := req.GetConfig()

	oldConfig, oldConfigAddOn, err := h.jobConfigOps.Get(
		ctx,
		jobID,
		jobRuntime.GetConfigurationVersion())
	if err != nil {
		h.metrics.JobUpdateFail.Inc(1)
		return nil, err
	}

	if err != nil {
		log.WithError(err).
			WithField("job_id", jobID.GetValue()).
			Error("Failed to GetJobConfig")
		h.metrics.JobUpdateFail.Inc(1)
		return nil, err
	}

	if oldConfig.GetType() != job.JobType_BATCH {
		return nil, yarpcerrors.InvalidArgumentErrorf(
			"job update is only supported for batch jobs")
	}

	if newConfig.GetRespoolID() == nil {
		newConfig.RespoolID = oldConfig.GetRespoolID()
	}

	// Remove the existing secret volumes from the config. These were added by
	// peloton at the time of secret creation. We will add them to new config
	// after validating the new config at the time of handling secrets. If we
	// keep these volumes in oldConfig, ValidateUpdatedConfig will fail.
	existingSecretVolumes := util.RemoveSecretVolumesFromJobConfig(oldConfig)

	// check secrets and new config for input sanity
	if err := h.validateSecretsAndConfig(newConfig, req.GetSecrets()); err != nil {
		return nil, err
	}
	err = jobconfig.ValidateUpdatedConfig(oldConfig, newConfig, h.jobSvcCfg.MaxTasksPerJob)
	if err != nil {
		h.metrics.JobUpdateFail.Inc(1)
		return nil, yarpcerrors.InvalidArgumentErrorf(err.Error())
	}

	if err = h.handleUpdateSecrets(ctx, jobID, existingSecretVolumes, newConfig,
		req.GetSecrets()); err != nil {
		h.metrics.JobUpdateFail.Inc(1)
		return nil, err
	}

	instancesToAdd := newConfig.GetInstanceCount() -
		oldConfig.GetInstanceCount()
	// You could just update secrets of a job without changing instance count.
	// In that case, do not treat this Update as NOOP.
	if instancesToAdd <= 0 && len(req.GetSecrets()) == 0 {
		log.WithField("job_id", jobID.GetValue()).
			Info("update is a noop")
		return nil, nil
	}

	var respoolPath string
	for _, label := range oldConfigAddOn.GetSystemLabels() {
		if label.GetKey() == common.SystemLabelResourcePool {
			respoolPath = label.GetValue()
		}
	}
	newConfigAddOn := &models.ConfigAddOn{
		SystemLabels: jobutil.ConstructSystemLabels(newConfig, respoolPath),
	}
	// first persist the configuration
	newUpdatedConfig, err := cachedJob.CompareAndSetConfig(
		ctx,
		mergeInstanceConfig(oldConfig, newConfig),
		newConfigAddOn,
		nil)
	if err != nil {
		h.metrics.JobUpdateFail.Inc(1)
		return nil, err
	}

	// next persist the runtime state and the new configuration version
	err = cachedJob.Update(ctx, &job.JobInfo{
		Runtime: &job.RuntimeInfo{
			ConfigurationVersion: newUpdatedConfig.GetChangeLog().GetVersion(),
			State:                job.JobState_INITIALIZED,
		},
	}, nil,
		nil,
		cached.UpdateCacheAndDB)
	if err != nil {
		h.metrics.JobUpdateFail.Inc(1)
		return nil, err
	}

	h.goalStateDriver.EnqueueJob(jobID, time.Now())

	h.metrics.JobUpdate.Inc(1)
	msg := fmt.Sprintf("added %d instances", instancesToAdd)
	return &job.UpdateResponse{
		Id:      jobID,
		Message: msg,
	}, nil
}