in pkg/aurorabridge/handler.go [410:589]
func (h *ServiceHandler) getScheduledTasks(
ctx context.Context,
jobSummary *stateless.JobSummary,
podInfos []*pod.PodInfo,
filter *taskFilter,
) ([]*api.ScheduledTask, error) {
jobID := jobSummary.GetJobId()
podRunsDepth := getPodRunsLimit(
uint32(len(podInfos)),
uint32(h.config.GetTasksPodMax),
uint32(h.config.PodRunsDepth),
)
var inputs []interface{}
for _, p := range podInfos {
podSpec := p.GetSpec()
podID := p.GetStatus().GetPodId()
podName := podSpec.GetPodName()
runID, err := util.ParseRunID(podID.GetValue())
if err != nil {
return nil, fmt.Errorf("failed to parse pod id: %s", err)
}
_, instanceID, err := util.ParseTaskID(podName.GetValue())
if err != nil {
return nil, fmt.Errorf("failed to parse pod name: %s", err)
}
// when PodRunsDepth set to 1, query only current run pods, when set
// to larger than 1, will query current plus previous run pods
for i := uint64(0); i < uint64(podRunsDepth); i++ {
newRunID := runID - i
if newRunID == 0 {
// No more previous run pods
break
}
newPodID := &peloton.PodID{
Value: util.CreateMesosTaskID(&v0peloton.JobID{
Value: jobID.GetValue(),
}, instanceID, newRunID).GetValue(),
}
taskInput := &getScheduledTaskInput{
podName: podName,
instanceID: instanceID,
}
if i == 0 {
// Attach for current run, leave podID to nil so that
// current run will be queried
taskInput.jobSummary = jobSummary
taskInput.podSpec = podSpec
} else {
// Attach for previous run
taskInput.podID = newPodID
}
inputs = append(inputs, taskInput)
}
}
lock := &sync.Mutex{}
jobInfoVersionMap := make(map[string]*stateless.JobInfo)
getJobInfoForVersion := func(podVersion string) (*stateless.JobInfo, error) {
lock.Lock()
defer lock.Unlock()
var jobInfo *stateless.JobInfo
var err error
jobInfo, ok := jobInfoVersionMap[podVersion]
if !ok {
jobInfo, err = h.getFullJobInfoByVersion(
ctx,
jobID,
&peloton.EntityVersion{Value: podVersion},
)
if err != nil {
return nil, fmt.Errorf("get job info by version: %s", err)
}
jobInfoVersionMap[podVersion] = jobInfo
}
return jobInfo, nil
}
f := func(ctx context.Context, input interface{}) (interface{}, error) {
taskInput, ok := input.(*getScheduledTaskInput)
if !ok {
return nil, fmt.Errorf("failed to cast to get scheduled task input")
}
podName := taskInput.podName
podID := taskInput.podID
instanceID := taskInput.instanceID
podEvents, err := h.getPodEvents(
ctx,
podName,
podID,
)
if err != nil {
return nil, fmt.Errorf(
"get pod events for pod %q with pod id %q: %s",
podName.GetValue(), podID.GetValue(), err)
}
if len(podEvents) == 0 {
return nil, nil
}
var t *api.ScheduledTask
if taskInput.jobSummary != nil && taskInput.podSpec != nil {
// For current pod run
t, err = ptoa.NewScheduledTask(
taskInput.jobSummary,
taskInput.podSpec,
podEvents,
)
if err != nil {
return nil, fmt.Errorf(
"new scheduled task: %s", err)
}
} else {
// For previous pod run
podVersion := podEvents[0].GetVersion().GetValue()
if len(podVersion) == 0 {
return nil, fmt.Errorf(
"cannot find pod version for pod: %s",
podID.GetValue())
}
prevJobInfo, err := getJobInfoForVersion(podVersion)
if err != nil {
return nil, fmt.Errorf("get job info for version: %s", err)
}
prevJobSummary := convertJobInfoToJobSummary(prevJobInfo)
prevPodSpec := getPodSpecForInstance(prevJobInfo.GetSpec(), instanceID)
t, err = ptoa.NewScheduledTask(prevJobSummary, prevPodSpec, podEvents)
if err != nil {
return nil, fmt.Errorf(
"new scheduled task: %s", err)
}
}
if !filter.include(t) {
return nil, nil
}
return t, nil
}
workers := h.config.getTasksWithoutConfigsWorkers(len(inputs))
outputs, err := concurrency.Map(
ctx,
concurrency.MapperFunc(f),
inputs,
workers)
if err != nil {
return nil, err
}
var tasks []*api.ScheduledTask
for _, o := range outputs {
t := o.(*api.ScheduledTask)
if t == nil {
continue
}
tasks = append(tasks, t)
}
return tasks, nil
}