func()

in pkg/jobmgr/jobsvc/stateless/handler.go [951:1074]


func (h *serviceHandler) GetJob(
	ctx context.Context,
	req *svc.GetJobRequest) (resp *svc.GetJobResponse, err error) {
	defer func() {
		headers := yarpcutil.GetHeaders(ctx)
		if err != nil {
			log.WithField("request", req).
				WithField("headers", headers).
				WithError(err).
				Warn("StatelessJobSvc.GetJob failed")
			err = yarpcutil.ConvertToYARPCError(err)
			return
		}

		log.WithField("req", req).
			WithField("headers", headers).
			Debug("StatelessJobSvc.GetJob succeeded")
	}()

	// Get the summary only
	if req.GetSummaryOnly() == true {
		return h.getJobSummary(ctx, req.GetJobId())
	}

	// Get the configuration for a given version only
	if req.GetVersion() != nil {
		return h.getJobConfigurationWithVersion(ctx, req.GetJobId(), req.GetVersion())
	}

	pelotonJobID := &peloton.JobID{Value: req.GetJobId().GetValue()}

	// Get the latest configuration and runtime
	jobRuntime, err := h.jobRuntimeOps.Get(
		ctx,
		pelotonJobID,
	)
	if err != nil {
		return nil, errors.Wrap(err, "failed to get job status")
	}

	var wg sync.WaitGroup
	var secretVolumes []*mesos.Volume
	var jobConfig *pbjob.JobConfig
	var updateInfo *models.UpdateModel
	var workflowEvents []*stateless.WorkflowEvent
	errs := make(chan error)

	wg.Add(1)
	go func() {
		defer wg.Done()

		var er error
		if jobConfig, _, er = h.jobConfigOps.Get(
			ctx,
			pelotonJobID,
			jobRuntime.GetConfigurationVersion(),
		); er != nil {
			errs <- errors.Wrap(er, "failed to get job spec")
		}

		// Do not display the secret volumes in defaultconfig that were added by
		// handleSecrets. They should remain internal to peloton logic.
		// Secret ID and Path should be returned using the peloton.Secret
		// proto message.
		secretVolumes = util.RemoveSecretVolumesFromJobConfig(jobConfig)
	}()

	if len(jobRuntime.GetUpdateID().GetValue()) > 0 {
		wg.Add(2)

		go func() {
			defer wg.Done()

			var er error
			if updateInfo, er = h.updateStore.GetUpdate(
				ctx,
				jobRuntime.GetUpdateID(),
			); er != nil {
				errs <- errors.Wrap(er, "failed to get update information")
			}
		}()

		go func() {
			defer wg.Done()

			var er error
			if workflowEvents, er = h.jobUpdateEventsOps.GetAll(
				ctx,
				jobRuntime.GetUpdateID()); er != nil {
				errs <- errors.Wrap(er, "fail to get job update events")
			}
		}()
	}

	go func() {
		wg.Wait()
		close(errs)
	}()

	for e := range errs {
		if e != nil {
			err = multierror.Append(err, e)
		}
	}

	if err != nil {
		return nil, err
	}

	return &svc.GetJobResponse{
		JobInfo: &stateless.JobInfo{
			JobId:  req.GetJobId(),
			Spec:   api.ConvertJobConfigToJobSpec(jobConfig),
			Status: api.ConvertRuntimeInfoToJobStatus(jobRuntime, updateInfo),
		},
		Secrets: api.ConvertV0SecretsToV1Secrets(
			jobmgrtask.CreateSecretsFromVolumes(secretVolumes)),
		WorkflowInfo: api.ConvertUpdateModelToWorkflowInfo(
			jobRuntime,
			updateInfo,
			workflowEvents,
			nil),
	}, nil
}