in pkg/jobmgr/jobsvc/stateless/handler.go [260:397]
func (h *serviceHandler) ReplaceJob(
ctx context.Context,
req *svc.ReplaceJobRequest) (resp *svc.ReplaceJobResponse, err error) {
var updateID *peloton.UpdateID
defer func() {
jobID := req.GetJobId().GetValue()
specVersion := req.GetSpec().GetRevision().GetVersion()
entityVersion := req.GetVersion().GetValue()
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("job_id", jobID).
WithField("spec_version", specVersion).
WithField("entity_version", entityVersion).
WithField("headers", headers).
WithError(err).
Warn("JobSVC.ReplaceJob failed")
err = yarpcutil.ConvertToYARPCError(err)
return
}
log.WithField("job_id", jobID).
WithField("spec_version", specVersion).
WithField("entity_version", entityVersion).
WithField("response", resp).
WithField("update_id", updateID.GetValue()).
WithField("headers", headers).
Info("JobSVC.ReplaceJob succeeded")
}()
if !h.candidate.IsLeader() {
return nil,
yarpcerrors.UnavailableErrorf("JobSVC.ReplaceJob is not supported on non-leader")
}
// TODO: handle secretes
jobUUID := uuid.Parse(req.GetJobId().GetValue())
if jobUUID == nil {
return nil, yarpcerrors.InvalidArgumentErrorf(
"JobID must be of UUID format")
}
jobSpec, err := handlerutil.ConvertForThermosExecutor(
req.GetSpec(),
h.jobSvcCfg.ThermosExecutor,
)
if err != nil {
return nil, errors.Wrap(err, "failed to convert for thermos executor")
}
jobConfig, err := api.ConvertJobSpecToJobConfig(jobSpec)
if err != nil {
return nil, errors.Wrap(err, "failed to convert job spec")
}
err = jobconfig.ValidateConfig(
jobConfig,
h.jobSvcCfg.MaxTasksPerJob,
)
if err != nil {
return nil, errors.Wrap(err, "invalid job spec")
}
jobID := &peloton.JobID{Value: req.GetJobId().GetValue()}
cachedJob := h.jobFactory.AddJob(jobID)
jobRuntime, err := cachedJob.GetRuntime(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to get job runtime from cache")
}
prevJobConfig, prevConfigAddOn, err := h.jobConfigOps.Get(
ctx,
jobID,
jobRuntime.GetConfigurationVersion())
if err != nil {
return nil, errors.Wrap(err, "failed to get previous job spec")
}
if err := validateJobConfigUpdate(prevJobConfig, jobConfig); err != nil {
return nil, errors.Wrap(err, "failed to validate spec update")
}
// get the new configAddOn
var respoolPath string
for _, label := range prevConfigAddOn.GetSystemLabels() {
if label.GetKey() == common.SystemLabelResourcePool {
respoolPath = label.GetValue()
}
}
configAddOn := &models.ConfigAddOn{
SystemLabels: jobutil.ConstructSystemLabels(jobConfig, respoolPath),
}
opaque := cached.WithOpaqueData(nil)
if req.GetOpaqueData() != nil {
opaque = cached.WithOpaqueData(&peloton.OpaqueData{
Data: req.GetOpaqueData().GetData(),
})
}
// if change log is set, CreateWorkflow would use the version inside
// to do concurrency control.
// However, for replace job, concurrency control is done by entity version.
// User should not be required to provide config version when entity version is
// provided.
jobConfig.ChangeLog = nil
updateID, newEntityVersion, err := cachedJob.CreateWorkflow(
ctx,
models.WorkflowType_UPDATE,
api.ConvertUpdateSpecToUpdateConfig(req.GetUpdateSpec()),
req.GetVersion(),
cached.WithConfig(
jobConfig,
prevJobConfig,
configAddOn,
req.GetSpec()),
opaque,
)
// In case of error, since it is not clear if job runtime was
// persisted with the update ID or not, enqueue the update to
// the goal state. If the update ID got persisted, update should
// start running, else, it should be aborted. Enqueueing it into
// the goal state will ensure both. In case the update was not
// persisted, clear the cache as well so that it is reloaded
// from DB and cleaned up.
if len(updateID.GetValue()) > 0 {
h.goalStateDriver.EnqueueUpdate(jobID, updateID, time.Now())
}
if err != nil {
return nil, errors.Wrap(err, "failed to create update workload")
}
return &svc.ReplaceJobResponse{Version: newEntityVersion}, nil
}