func()

in pkg/jobmgr/jobsvc/stateless/handler.go [1429:1543]


func (h *serviceHandler) ListJobWorkflows(
	ctx context.Context,
	req *svc.ListJobWorkflowsRequest) (resp *svc.ListJobWorkflowsResponse, err error) {
	defer func() {
		headers := yarpcutil.GetHeaders(ctx)
		if err != nil {
			log.WithField("request", req).
				WithField("headers", headers).
				WithError(err).
				Warn("JobSVC.ListJobWorkflows failed")
			err = yarpcutil.ConvertToYARPCError(err)
			return
		}

		log.WithField("request", req).
			WithField("headers", headers).
			WithField("num_of_workflows", len(resp.GetWorkflowInfos())).
			Debug("JobSVC.ListJobWorkflows succeeded")
	}()

	if len(req.GetJobId().GetValue()) == 0 {
		return nil, yarpcerrors.InvalidArgumentErrorf("no job id provided")
	}

	updateIDs, err := h.updateStore.GetUpdatesForJob(ctx, req.GetJobId().GetValue())
	if err != nil {
		return nil, err
	}

	if req.GetUpdatesLimit() > 0 {
		updateIDs = updateIDs[:util.Min(uint32(len(updateIDs)), req.GetUpdatesLimit())]
	}

	pelotonJobID := &peloton.JobID{Value: req.GetJobId().GetValue()}

	jobRuntime, err := h.jobRuntimeOps.Get(ctx, pelotonJobID)
	if err != nil {
		return nil, errors.Wrap(err, "fail to get job runtime")
	}

	var sequence int
	var wg sync.WaitGroup
	ch := make(chan updateInfoChan)
	updateInfos := make([]*stateless.WorkflowInfo, len(updateIDs))
	for _, updateID := range updateIDs {
		wg.Add(1)

		go func(updateID *peloton.UpdateID, sequence int) {
			defer wg.Done()

			updateModel, err := h.updateStore.GetUpdate(ctx, updateID)
			if err != nil {
				ch <- updateInfoChan{
					err: err,
				}
				return
			}

			workflowEvents, err := h.jobUpdateEventsOps.GetAll(
				ctx,
				updateID)
			if err != nil {
				ch <- updateInfoChan{
					err: errors.Wrap(err, "fail to get job workflow events"),
				}
				return
			}

			var instanceWorkflowEvents []*stateless.WorkflowInfoInstanceWorkflowEvents
			if req.GetInstanceEvents() {
				instanceWorkflowEvents, err = h.getInstanceWorkflowEvents(
					ctx,
					updateModel,
					req.GetInstanceEventsLimit(),
				)
				if err != nil {
					ch <- updateInfoChan{
						err: errors.Wrap(err, "fail to get instance workflow events"),
					}
					return
				}
			}

			ch <- updateInfoChan{
				updateInfo: api.ConvertUpdateModelToWorkflowInfo(
					jobRuntime,
					updateModel,
					workflowEvents,
					instanceWorkflowEvents),
				err:      nil,
				sequence: sequence,
			}
		}(updateID, sequence)
		sequence++
	}

	go func() {
		wg.Wait()
		close(ch)
	}()

	for update := range ch {
		if update.err != nil {
			err = update.err
			continue
		}
		updateInfos[update.sequence] = update.updateInfo
	}

	if err != nil {
		return nil, err
	}

	return &svc.ListJobWorkflowsResponse{WorkflowInfos: updateInfos}, nil
}