in internal/master/map_task_master.go [700:748]
func (m *MapTaskMaster) BatchHandlePulledProgress(masterStartContainerRequests []*schedulerx.MasterStartContainerRequest,
remoteWorker string) (map[string][]*schedulerx.MasterStartContainerRequest, map[string][]*schedulerx.MasterStartContainerRequest) {
var (
worker2ReqsWithNormal = make(map[string][]*schedulerx.MasterStartContainerRequest)
worker2ReqsWithFailover = make(map[string][]*schedulerx.MasterStartContainerRequest)
)
for _, request := range masterStartContainerRequests {
workerIdAddr := remoteWorker
if workerIdAddr == "" {
workerIdAddr = m.selectWorker()
}
if workerIdAddr == "" {
if err := m.updateNewInstanceStatus(m.GetSerialNum(), m.GetJobInstanceInfo().GetJobInstanceId(),
processor.InstanceStatusFailed, "all worker is down!"); err != nil {
logger.Errorf("updateNewInstanceStatus failed in BatchHandlePulledProgress, err=%s", err.Error())
}
break
}
workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr)
if request.GetFailover() {
if _, ok := worker2ReqsWithFailover[workerIdAddr]; !ok {
worker2ReqsWithFailover[workerIdAddr] = []*schedulerx.MasterStartContainerRequest{request}
} else {
worker2ReqsWithFailover[workerIdAddr] = append(worker2ReqsWithFailover[workerIdAddr], request)
}
} else {
if _, ok := worker2ReqsWithNormal[workerIdAddr]; !ok {
worker2ReqsWithNormal[workerIdAddr] = []*schedulerx.MasterStartContainerRequest{request}
} else {
worker2ReqsWithNormal[workerIdAddr] = append(worker2ReqsWithNormal[workerIdAddr], request)
}
// The subtasks of failover do not need to be counted anymore
if val, ok := m.taskProgressMap.Load(request.GetTaskName()); ok {
val.(*common.TaskProgressCounter).IncrementOnePulled()
}
}
if _, ok := m.workerProgressMap.Load(workerAddr); workerAddr != "" && !ok {
m.workerProgressMap.LoadOrStore(workerAddr, common.NewWorkerProgressCounter(workerAddr))
}
if val, ok := m.workerProgressMap.Load(workerAddr); ok {
val.(*common.WorkerProgressCounter).IncrementTotal()
val.(*common.WorkerProgressCounter).IncrementPulled()
}
}
return worker2ReqsWithNormal, worker2ReqsWithFailover
}