in pkg/jobmgr/jobsvc/stateless/handler.go [405:519]
func (h *serviceHandler) RestartJob(
ctx context.Context,
req *svc.RestartJobRequest) (resp *svc.RestartJobResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("request", req).
WithField("headers", headers).
WithError(err).
Warn("JobSVC.RestartJob failed")
err = yarpcutil.ConvertToYARPCError(err)
return
}
log.WithField("request", req).
WithField("response", resp).
WithField("headers", headers).
Info("JobSVC.RestartJob succeeded")
}()
if !h.candidate.IsLeader() {
return nil, yarpcerrors.UnavailableErrorf("JobSVC.RestartJob is not supported on non-leader")
}
jobID := &peloton.JobID{Value: req.GetJobId().GetValue()}
cachedJob := h.jobFactory.AddJob(jobID)
runtime, err := cachedJob.GetRuntime(ctx)
if err != nil {
return nil, errors.Wrap(err, "fail to get job runtime")
}
obj, err := h.jobConfigOps.GetResult(
ctx,
jobID,
runtime.GetConfigurationVersion(),
)
if err != nil {
return nil, errors.Wrap(err, "fail to get job config")
}
jobConfig := obj.JobConfig
configAddOn := obj.ConfigAddOn
jobSpec := obj.JobSpec
// copy the config with provided resource version number
newConfig := *jobConfig
now := time.Now()
newConfig.ChangeLog = &peloton.ChangeLog{
Version: jobConfig.GetChangeLog().GetVersion(),
CreatedAt: uint64(now.UnixNano()),
UpdatedAt: uint64(now.UnixNano()),
}
var newSpec stateless.JobSpec
if jobSpec != nil {
newSpec = *jobSpec
newSpec.Revision = &v1alphapeloton.Revision{
Version: newConfig.GetChangeLog().GetVersion(),
CreatedAt: newConfig.GetChangeLog().GetCreatedAt(),
UpdatedAt: newConfig.GetChangeLog().GetUpdatedAt(),
}
}
opaque := cached.WithOpaqueData(nil)
if req.GetOpaqueData() != nil {
opaque = cached.WithOpaqueData(&peloton.OpaqueData{
Data: req.GetOpaqueData().GetData(),
})
}
instancesToUpdate := convertInstanceIDRangesToSlice(req.GetRestartSpec().GetRanges(), newConfig.GetInstanceCount())
if len(instancesToUpdate) == 0 {
for i := uint32(0); i < newConfig.GetInstanceCount(); i++ {
instancesToUpdate = append(instancesToUpdate, i)
}
}
updateID, newEntityVersion, err := cachedJob.CreateWorkflow(
ctx,
models.WorkflowType_RESTART,
&pbupdate.UpdateConfig{
BatchSize: req.GetRestartSpec().GetBatchSize(),
InPlace: req.GetRestartSpec().GetInPlace(),
},
req.GetVersion(),
cached.WithInstanceToProcess(
nil,
instancesToUpdate,
nil),
cached.WithConfig(
&newConfig,
jobConfig,
configAddOn,
&newSpec,
),
opaque,
)
// In case of error, since it is not clear if job runtime was
// persisted with the update ID or not, enqueue the update to
// the goal state. If the update ID got persisted, update should
// start running, else, it should be aborted. Enqueueing it into
// the goal state will ensure both. In case the update was not
// persisted, clear the cache as well so that it is reloaded
// from DB and cleaned up.
// Add update to goal state engine to start it
if len(updateID.GetValue()) > 0 {
h.goalStateDriver.EnqueueUpdate(jobID, updateID, time.Now())
}
if err != nil {
return nil, err
}
return &svc.RestartJobResponse{Version: newEntityVersion}, nil
}