func()

in pkg/jobmgr/jobsvc/stateless/handler.go [260:397]


func (h *serviceHandler) ReplaceJob(
	ctx context.Context,
	req *svc.ReplaceJobRequest) (resp *svc.ReplaceJobResponse, err error) {
	var updateID *peloton.UpdateID

	defer func() {
		jobID := req.GetJobId().GetValue()
		specVersion := req.GetSpec().GetRevision().GetVersion()
		entityVersion := req.GetVersion().GetValue()
		headers := yarpcutil.GetHeaders(ctx)

		if err != nil {
			log.WithField("job_id", jobID).
				WithField("spec_version", specVersion).
				WithField("entity_version", entityVersion).
				WithField("headers", headers).
				WithError(err).
				Warn("JobSVC.ReplaceJob failed")
			err = yarpcutil.ConvertToYARPCError(err)
			return
		}

		log.WithField("job_id", jobID).
			WithField("spec_version", specVersion).
			WithField("entity_version", entityVersion).
			WithField("response", resp).
			WithField("update_id", updateID.GetValue()).
			WithField("headers", headers).
			Info("JobSVC.ReplaceJob succeeded")
	}()

	if !h.candidate.IsLeader() {
		return nil,
			yarpcerrors.UnavailableErrorf("JobSVC.ReplaceJob is not supported on non-leader")
	}

	// TODO: handle secretes
	jobUUID := uuid.Parse(req.GetJobId().GetValue())
	if jobUUID == nil {
		return nil, yarpcerrors.InvalidArgumentErrorf(
			"JobID must be of UUID format")
	}

	jobSpec, err := handlerutil.ConvertForThermosExecutor(
		req.GetSpec(),
		h.jobSvcCfg.ThermosExecutor,
	)
	if err != nil {
		return nil, errors.Wrap(err, "failed to convert for thermos executor")
	}

	jobConfig, err := api.ConvertJobSpecToJobConfig(jobSpec)
	if err != nil {
		return nil, errors.Wrap(err, "failed to convert job spec")
	}

	err = jobconfig.ValidateConfig(
		jobConfig,
		h.jobSvcCfg.MaxTasksPerJob,
	)
	if err != nil {
		return nil, errors.Wrap(err, "invalid job spec")
	}

	jobID := &peloton.JobID{Value: req.GetJobId().GetValue()}

	cachedJob := h.jobFactory.AddJob(jobID)
	jobRuntime, err := cachedJob.GetRuntime(ctx)
	if err != nil {
		return nil, errors.Wrap(err, "failed to get job runtime from cache")
	}

	prevJobConfig, prevConfigAddOn, err := h.jobConfigOps.Get(
		ctx,
		jobID,
		jobRuntime.GetConfigurationVersion())
	if err != nil {
		return nil, errors.Wrap(err, "failed to get previous job spec")
	}

	if err := validateJobConfigUpdate(prevJobConfig, jobConfig); err != nil {
		return nil, errors.Wrap(err, "failed to validate spec update")
	}

	// get the new configAddOn
	var respoolPath string
	for _, label := range prevConfigAddOn.GetSystemLabels() {
		if label.GetKey() == common.SystemLabelResourcePool {
			respoolPath = label.GetValue()
		}
	}
	configAddOn := &models.ConfigAddOn{
		SystemLabels: jobutil.ConstructSystemLabels(jobConfig, respoolPath),
	}

	opaque := cached.WithOpaqueData(nil)
	if req.GetOpaqueData() != nil {
		opaque = cached.WithOpaqueData(&peloton.OpaqueData{
			Data: req.GetOpaqueData().GetData(),
		})
	}

	// if change log is set, CreateWorkflow would use the version inside
	// to do concurrency control.
	// However, for replace job, concurrency control is done by entity version.
	// User should not be required to provide config version when entity version is
	// provided.
	jobConfig.ChangeLog = nil
	updateID, newEntityVersion, err := cachedJob.CreateWorkflow(
		ctx,
		models.WorkflowType_UPDATE,
		api.ConvertUpdateSpecToUpdateConfig(req.GetUpdateSpec()),
		req.GetVersion(),
		cached.WithConfig(
			jobConfig,
			prevJobConfig,
			configAddOn,
			req.GetSpec()),
		opaque,
	)

	// In case of error, since it is not clear if job runtime was
	// persisted with the update ID or not, enqueue the update to
	// the goal state. If the update ID got persisted, update should
	// start running, else, it should be aborted. Enqueueing it into
	// the goal state will ensure both. In case the update was not
	// persisted, clear the cache as well so that it is reloaded
	// from DB and cleaned up.
	if len(updateID.GetValue()) > 0 {
		h.goalStateDriver.EnqueueUpdate(jobID, updateID, time.Now())
	}

	if err != nil {
		return nil, errors.Wrap(err, "failed to create update workload")
	}

	return &svc.ReplaceJobResponse{Version: newEntityVersion}, nil
}