func()

in pkg/aurorabridge/handler.go [410:589]


func (h *ServiceHandler) getScheduledTasks(
	ctx context.Context,
	jobSummary *stateless.JobSummary,
	podInfos []*pod.PodInfo,
	filter *taskFilter,
) ([]*api.ScheduledTask, error) {
	jobID := jobSummary.GetJobId()

	podRunsDepth := getPodRunsLimit(
		uint32(len(podInfos)),
		uint32(h.config.GetTasksPodMax),
		uint32(h.config.PodRunsDepth),
	)

	var inputs []interface{}
	for _, p := range podInfos {
		podSpec := p.GetSpec()
		podID := p.GetStatus().GetPodId()
		podName := podSpec.GetPodName()

		runID, err := util.ParseRunID(podID.GetValue())
		if err != nil {
			return nil, fmt.Errorf("failed to parse pod id: %s", err)
		}

		_, instanceID, err := util.ParseTaskID(podName.GetValue())
		if err != nil {
			return nil, fmt.Errorf("failed to parse pod name: %s", err)
		}

		// when PodRunsDepth set to 1, query only current run pods, when set
		// to larger than 1, will query current plus previous run pods
		for i := uint64(0); i < uint64(podRunsDepth); i++ {
			newRunID := runID - i
			if newRunID == 0 {
				// No more previous run pods
				break
			}

			newPodID := &peloton.PodID{
				Value: util.CreateMesosTaskID(&v0peloton.JobID{
					Value: jobID.GetValue(),
				}, instanceID, newRunID).GetValue(),
			}

			taskInput := &getScheduledTaskInput{
				podName:    podName,
				instanceID: instanceID,
			}

			if i == 0 {
				// Attach for current run, leave podID to nil so that
				// current run will be queried
				taskInput.jobSummary = jobSummary
				taskInput.podSpec = podSpec
			} else {
				// Attach for previous run
				taskInput.podID = newPodID
			}

			inputs = append(inputs, taskInput)
		}
	}

	lock := &sync.Mutex{}
	jobInfoVersionMap := make(map[string]*stateless.JobInfo)

	getJobInfoForVersion := func(podVersion string) (*stateless.JobInfo, error) {
		lock.Lock()
		defer lock.Unlock()

		var jobInfo *stateless.JobInfo
		var err error

		jobInfo, ok := jobInfoVersionMap[podVersion]
		if !ok {
			jobInfo, err = h.getFullJobInfoByVersion(
				ctx,
				jobID,
				&peloton.EntityVersion{Value: podVersion},
			)
			if err != nil {
				return nil, fmt.Errorf("get job info by version: %s", err)
			}

			jobInfoVersionMap[podVersion] = jobInfo
		}

		return jobInfo, nil
	}

	f := func(ctx context.Context, input interface{}) (interface{}, error) {
		taskInput, ok := input.(*getScheduledTaskInput)
		if !ok {
			return nil, fmt.Errorf("failed to cast to get scheduled task input")
		}

		podName := taskInput.podName
		podID := taskInput.podID
		instanceID := taskInput.instanceID

		podEvents, err := h.getPodEvents(
			ctx,
			podName,
			podID,
		)
		if err != nil {
			return nil, fmt.Errorf(
				"get pod events for pod %q with pod id %q: %s",
				podName.GetValue(), podID.GetValue(), err)
		}
		if len(podEvents) == 0 {
			return nil, nil
		}

		var t *api.ScheduledTask

		if taskInput.jobSummary != nil && taskInput.podSpec != nil {
			// For current pod run
			t, err = ptoa.NewScheduledTask(
				taskInput.jobSummary,
				taskInput.podSpec,
				podEvents,
			)
			if err != nil {
				return nil, fmt.Errorf(
					"new scheduled task: %s", err)
			}
		} else {
			// For previous pod run
			podVersion := podEvents[0].GetVersion().GetValue()
			if len(podVersion) == 0 {
				return nil, fmt.Errorf(
					"cannot find pod version for pod: %s",
					podID.GetValue())
			}

			prevJobInfo, err := getJobInfoForVersion(podVersion)
			if err != nil {
				return nil, fmt.Errorf("get job info for version: %s", err)
			}

			prevJobSummary := convertJobInfoToJobSummary(prevJobInfo)
			prevPodSpec := getPodSpecForInstance(prevJobInfo.GetSpec(), instanceID)

			t, err = ptoa.NewScheduledTask(prevJobSummary, prevPodSpec, podEvents)
			if err != nil {
				return nil, fmt.Errorf(
					"new scheduled task: %s", err)
			}
		}

		if !filter.include(t) {
			return nil, nil
		}

		return t, nil
	}

	workers := h.config.getTasksWithoutConfigsWorkers(len(inputs))

	outputs, err := concurrency.Map(
		ctx,
		concurrency.MapperFunc(f),
		inputs,
		workers)
	if err != nil {
		return nil, err
	}

	var tasks []*api.ScheduledTask
	for _, o := range outputs {
		t := o.(*api.ScheduledTask)
		if t == nil {
			continue
		}
		tasks = append(tasks, t)
	}
	return tasks, nil
}