in internal/master/map_task_master.go [664:698]
func (m *MapTaskMaster) batchHandleRunningProgress(masterStartContainerRequests []*schedulerx.MasterStartContainerRequest,
worker2ReqsWithNormal map[string][]*schedulerx.MasterStartContainerRequest, worker2ReqsWithFailover map[string][]*schedulerx.MasterStartContainerRequest) {
for _, request := range masterStartContainerRequests {
workerIdAddr := m.selectWorker()
if workerIdAddr == "" {
m.updateNewInstanceStatus(m.GetSerialNum(), m.GetJobInstanceInfo().GetJobInstanceId(), processor.InstanceStatusFailed, "all worker is down!")
break
}
workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr)
if request.GetFailover() {
if _, ok := worker2ReqsWithFailover[workerIdAddr]; !ok {
worker2ReqsWithFailover[workerIdAddr] = []*schedulerx.MasterStartContainerRequest{request}
} else {
worker2ReqsWithFailover[workerIdAddr] = append(worker2ReqsWithFailover[workerIdAddr], request)
}
} else {
if _, ok := worker2ReqsWithNormal[workerIdAddr]; !ok {
worker2ReqsWithNormal[workerIdAddr] = []*schedulerx.MasterStartContainerRequest{request}
} else {
worker2ReqsWithNormal[workerIdAddr] = append(worker2ReqsWithFailover[workerIdAddr], request)
}
}
if val, ok := m.taskProgressMap.Load(request.GetTaskName()); ok {
val.(*common.TaskProgressCounter).IncrementRunning()
}
if _, ok := m.workerProgressMap.Load(workerAddr); workerAddr != "" && !ok {
m.workerProgressMap.LoadOrStore(workerAddr, common.NewWorkerProgressCounter(workerAddr))
}
if val, ok := m.workerProgressMap.Load(workerAddr); ok {
val.(*common.WorkerProgressCounter).IncrementTotal()
val.(*common.WorkerProgressCounter).IncrementRunning()
}
}
}