in pkg/jobmgr/jobsvc/handler.go [255:413]
func (h *serviceHandler) Update(
ctx context.Context,
req *job.UpdateRequest) (resp *job.UpdateResponse, err error) {
defer func() {
jobID := req.GetId().GetValue()
headers := yarpcutil.GetHeaders(ctx)
configVersion := req.GetConfig().GetChangeLog().GetVersion()
if err != nil || resp.GetError() != nil {
entry := log.WithField("job_id", jobID).
WithField("headers", headers).
WithField("config_version", configVersion)
if err != nil {
entry = entry.WithError(err)
}
if resp.GetError() != nil {
entry = entry.WithField("update_error", resp.GetError().String())
}
entry.Warn("JobManager.Update failed")
return
}
log.WithField("job_id", jobID).
WithField("response", resp).
WithField("config_version", configVersion).
WithField("headers", headers).
Info("JobManager.Update succeeded")
}()
h.metrics.JobAPIUpdate.Inc(1)
if !h.candidate.IsLeader() {
h.metrics.JobUpdateFail.Inc(1)
return nil, yarpcerrors.UnavailableErrorf(
"Job Update API not suppported on non-leader")
}
jobID := req.GetId()
cachedJob := h.jobFactory.AddJob(jobID)
jobRuntime, err := cachedJob.GetRuntime(ctx)
if err != nil {
log.WithError(err).
WithField("job_id", jobID.GetValue()).
Error("Failed to get runtime")
h.metrics.JobUpdateFail.Inc(1)
return nil, err
}
if util.IsPelotonJobStateTerminal(jobRuntime.State) {
msg := fmt.Sprintf("Job is in a terminal state:%s", jobRuntime.State)
h.metrics.JobUpdateFail.Inc(1)
return nil, yarpcerrors.InvalidArgumentErrorf(msg)
}
newConfig := req.GetConfig()
oldConfig, oldConfigAddOn, err := h.jobConfigOps.Get(
ctx,
jobID,
jobRuntime.GetConfigurationVersion())
if err != nil {
h.metrics.JobUpdateFail.Inc(1)
return nil, err
}
if err != nil {
log.WithError(err).
WithField("job_id", jobID.GetValue()).
Error("Failed to GetJobConfig")
h.metrics.JobUpdateFail.Inc(1)
return nil, err
}
if oldConfig.GetType() != job.JobType_BATCH {
return nil, yarpcerrors.InvalidArgumentErrorf(
"job update is only supported for batch jobs")
}
if newConfig.GetRespoolID() == nil {
newConfig.RespoolID = oldConfig.GetRespoolID()
}
// Remove the existing secret volumes from the config. These were added by
// peloton at the time of secret creation. We will add them to new config
// after validating the new config at the time of handling secrets. If we
// keep these volumes in oldConfig, ValidateUpdatedConfig will fail.
existingSecretVolumes := util.RemoveSecretVolumesFromJobConfig(oldConfig)
// check secrets and new config for input sanity
if err := h.validateSecretsAndConfig(newConfig, req.GetSecrets()); err != nil {
return nil, err
}
err = jobconfig.ValidateUpdatedConfig(oldConfig, newConfig, h.jobSvcCfg.MaxTasksPerJob)
if err != nil {
h.metrics.JobUpdateFail.Inc(1)
return nil, yarpcerrors.InvalidArgumentErrorf(err.Error())
}
if err = h.handleUpdateSecrets(ctx, jobID, existingSecretVolumes, newConfig,
req.GetSecrets()); err != nil {
h.metrics.JobUpdateFail.Inc(1)
return nil, err
}
instancesToAdd := newConfig.GetInstanceCount() -
oldConfig.GetInstanceCount()
// You could just update secrets of a job without changing instance count.
// In that case, do not treat this Update as NOOP.
if instancesToAdd <= 0 && len(req.GetSecrets()) == 0 {
log.WithField("job_id", jobID.GetValue()).
Info("update is a noop")
return nil, nil
}
var respoolPath string
for _, label := range oldConfigAddOn.GetSystemLabels() {
if label.GetKey() == common.SystemLabelResourcePool {
respoolPath = label.GetValue()
}
}
newConfigAddOn := &models.ConfigAddOn{
SystemLabels: jobutil.ConstructSystemLabels(newConfig, respoolPath),
}
// first persist the configuration
newUpdatedConfig, err := cachedJob.CompareAndSetConfig(
ctx,
mergeInstanceConfig(oldConfig, newConfig),
newConfigAddOn,
nil)
if err != nil {
h.metrics.JobUpdateFail.Inc(1)
return nil, err
}
// next persist the runtime state and the new configuration version
err = cachedJob.Update(ctx, &job.JobInfo{
Runtime: &job.RuntimeInfo{
ConfigurationVersion: newUpdatedConfig.GetChangeLog().GetVersion(),
State: job.JobState_INITIALIZED,
},
}, nil,
nil,
cached.UpdateCacheAndDB)
if err != nil {
h.metrics.JobUpdateFail.Inc(1)
return nil, err
}
h.goalStateDriver.EnqueueJob(jobID, time.Now())
h.metrics.JobUpdate.Inc(1)
msg := fmt.Sprintf("added %d instances", instancesToAdd)
return &job.UpdateResponse{
Id: jobID,
Message: msg,
}, nil
}