func()

in internal/master/map_task_master.go [664:698]


func (m *MapTaskMaster) batchHandleRunningProgress(masterStartContainerRequests []*schedulerx.MasterStartContainerRequest,
	worker2ReqsWithNormal map[string][]*schedulerx.MasterStartContainerRequest, worker2ReqsWithFailover map[string][]*schedulerx.MasterStartContainerRequest) {
	for _, request := range masterStartContainerRequests {
		workerIdAddr := m.selectWorker()
		if workerIdAddr == "" {
			m.updateNewInstanceStatus(m.GetSerialNum(), m.GetJobInstanceInfo().GetJobInstanceId(), processor.InstanceStatusFailed, "all worker is down!")
			break
		}

		workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr)
		if request.GetFailover() {
			if _, ok := worker2ReqsWithFailover[workerIdAddr]; !ok {
				worker2ReqsWithFailover[workerIdAddr] = []*schedulerx.MasterStartContainerRequest{request}
			} else {
				worker2ReqsWithFailover[workerIdAddr] = append(worker2ReqsWithFailover[workerIdAddr], request)
			}
		} else {
			if _, ok := worker2ReqsWithNormal[workerIdAddr]; !ok {
				worker2ReqsWithNormal[workerIdAddr] = []*schedulerx.MasterStartContainerRequest{request}
			} else {
				worker2ReqsWithNormal[workerIdAddr] = append(worker2ReqsWithFailover[workerIdAddr], request)
			}
		}
		if val, ok := m.taskProgressMap.Load(request.GetTaskName()); ok {
			val.(*common.TaskProgressCounter).IncrementRunning()
		}
		if _, ok := m.workerProgressMap.Load(workerAddr); workerAddr != "" && !ok {
			m.workerProgressMap.LoadOrStore(workerAddr, common.NewWorkerProgressCounter(workerAddr))
		}
		if val, ok := m.workerProgressMap.Load(workerAddr); ok {
			val.(*common.WorkerProgressCounter).IncrementTotal()
			val.(*common.WorkerProgressCounter).IncrementRunning()
		}
	}
}