in pkg/jobmgr/jobsvc/stateless/handler.go [951:1074]
func (h *serviceHandler) GetJob(
ctx context.Context,
req *svc.GetJobRequest) (resp *svc.GetJobResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("request", req).
WithField("headers", headers).
WithError(err).
Warn("StatelessJobSvc.GetJob failed")
err = yarpcutil.ConvertToYARPCError(err)
return
}
log.WithField("req", req).
WithField("headers", headers).
Debug("StatelessJobSvc.GetJob succeeded")
}()
// Get the summary only
if req.GetSummaryOnly() == true {
return h.getJobSummary(ctx, req.GetJobId())
}
// Get the configuration for a given version only
if req.GetVersion() != nil {
return h.getJobConfigurationWithVersion(ctx, req.GetJobId(), req.GetVersion())
}
pelotonJobID := &peloton.JobID{Value: req.GetJobId().GetValue()}
// Get the latest configuration and runtime
jobRuntime, err := h.jobRuntimeOps.Get(
ctx,
pelotonJobID,
)
if err != nil {
return nil, errors.Wrap(err, "failed to get job status")
}
var wg sync.WaitGroup
var secretVolumes []*mesos.Volume
var jobConfig *pbjob.JobConfig
var updateInfo *models.UpdateModel
var workflowEvents []*stateless.WorkflowEvent
errs := make(chan error)
wg.Add(1)
go func() {
defer wg.Done()
var er error
if jobConfig, _, er = h.jobConfigOps.Get(
ctx,
pelotonJobID,
jobRuntime.GetConfigurationVersion(),
); er != nil {
errs <- errors.Wrap(er, "failed to get job spec")
}
// Do not display the secret volumes in defaultconfig that were added by
// handleSecrets. They should remain internal to peloton logic.
// Secret ID and Path should be returned using the peloton.Secret
// proto message.
secretVolumes = util.RemoveSecretVolumesFromJobConfig(jobConfig)
}()
if len(jobRuntime.GetUpdateID().GetValue()) > 0 {
wg.Add(2)
go func() {
defer wg.Done()
var er error
if updateInfo, er = h.updateStore.GetUpdate(
ctx,
jobRuntime.GetUpdateID(),
); er != nil {
errs <- errors.Wrap(er, "failed to get update information")
}
}()
go func() {
defer wg.Done()
var er error
if workflowEvents, er = h.jobUpdateEventsOps.GetAll(
ctx,
jobRuntime.GetUpdateID()); er != nil {
errs <- errors.Wrap(er, "fail to get job update events")
}
}()
}
go func() {
wg.Wait()
close(errs)
}()
for e := range errs {
if e != nil {
err = multierror.Append(err, e)
}
}
if err != nil {
return nil, err
}
return &svc.GetJobResponse{
JobInfo: &stateless.JobInfo{
JobId: req.GetJobId(),
Spec: api.ConvertJobConfigToJobSpec(jobConfig),
Status: api.ConvertRuntimeInfoToJobStatus(jobRuntime, updateInfo),
},
Secrets: api.ConvertV0SecretsToV1Secrets(
jobmgrtask.CreateSecretsFromVolumes(secretVolumes)),
WorkflowInfo: api.ConvertUpdateModelToWorkflowInfo(
jobRuntime,
updateInfo,
workflowEvents,
nil),
}, nil
}