in pkg/jobmgr/jobsvc/stateless/handler.go [1429:1543]
func (h *serviceHandler) ListJobWorkflows(
ctx context.Context,
req *svc.ListJobWorkflowsRequest) (resp *svc.ListJobWorkflowsResponse, err error) {
defer func() {
headers := yarpcutil.GetHeaders(ctx)
if err != nil {
log.WithField("request", req).
WithField("headers", headers).
WithError(err).
Warn("JobSVC.ListJobWorkflows failed")
err = yarpcutil.ConvertToYARPCError(err)
return
}
log.WithField("request", req).
WithField("headers", headers).
WithField("num_of_workflows", len(resp.GetWorkflowInfos())).
Debug("JobSVC.ListJobWorkflows succeeded")
}()
if len(req.GetJobId().GetValue()) == 0 {
return nil, yarpcerrors.InvalidArgumentErrorf("no job id provided")
}
updateIDs, err := h.updateStore.GetUpdatesForJob(ctx, req.GetJobId().GetValue())
if err != nil {
return nil, err
}
if req.GetUpdatesLimit() > 0 {
updateIDs = updateIDs[:util.Min(uint32(len(updateIDs)), req.GetUpdatesLimit())]
}
pelotonJobID := &peloton.JobID{Value: req.GetJobId().GetValue()}
jobRuntime, err := h.jobRuntimeOps.Get(ctx, pelotonJobID)
if err != nil {
return nil, errors.Wrap(err, "fail to get job runtime")
}
var sequence int
var wg sync.WaitGroup
ch := make(chan updateInfoChan)
updateInfos := make([]*stateless.WorkflowInfo, len(updateIDs))
for _, updateID := range updateIDs {
wg.Add(1)
go func(updateID *peloton.UpdateID, sequence int) {
defer wg.Done()
updateModel, err := h.updateStore.GetUpdate(ctx, updateID)
if err != nil {
ch <- updateInfoChan{
err: err,
}
return
}
workflowEvents, err := h.jobUpdateEventsOps.GetAll(
ctx,
updateID)
if err != nil {
ch <- updateInfoChan{
err: errors.Wrap(err, "fail to get job workflow events"),
}
return
}
var instanceWorkflowEvents []*stateless.WorkflowInfoInstanceWorkflowEvents
if req.GetInstanceEvents() {
instanceWorkflowEvents, err = h.getInstanceWorkflowEvents(
ctx,
updateModel,
req.GetInstanceEventsLimit(),
)
if err != nil {
ch <- updateInfoChan{
err: errors.Wrap(err, "fail to get instance workflow events"),
}
return
}
}
ch <- updateInfoChan{
updateInfo: api.ConvertUpdateModelToWorkflowInfo(
jobRuntime,
updateModel,
workflowEvents,
instanceWorkflowEvents),
err: nil,
sequence: sequence,
}
}(updateID, sequence)
sequence++
}
go func() {
wg.Wait()
close(ch)
}()
for update := range ch {
if update.err != nil {
err = update.err
continue
}
updateInfos[update.sequence] = update.updateInfo
}
if err != nil {
return nil, err
}
return &svc.ListJobWorkflowsResponse{WorkflowInfos: updateInfos}, nil
}