func()

in internal/master/map_task_master.go [700:748]


func (m *MapTaskMaster) BatchHandlePulledProgress(masterStartContainerRequests []*schedulerx.MasterStartContainerRequest,
	remoteWorker string) (map[string][]*schedulerx.MasterStartContainerRequest, map[string][]*schedulerx.MasterStartContainerRequest) {
	var (
		worker2ReqsWithNormal   = make(map[string][]*schedulerx.MasterStartContainerRequest)
		worker2ReqsWithFailover = make(map[string][]*schedulerx.MasterStartContainerRequest)
	)
	for _, request := range masterStartContainerRequests {
		workerIdAddr := remoteWorker
		if workerIdAddr == "" {
			workerIdAddr = m.selectWorker()
		}
		if workerIdAddr == "" {
			if err := m.updateNewInstanceStatus(m.GetSerialNum(), m.GetJobInstanceInfo().GetJobInstanceId(),
				processor.InstanceStatusFailed, "all worker is down!"); err != nil {
				logger.Errorf("updateNewInstanceStatus failed in BatchHandlePulledProgress, err=%s", err.Error())
			}
			break
		}

		workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr)
		if request.GetFailover() {
			if _, ok := worker2ReqsWithFailover[workerIdAddr]; !ok {
				worker2ReqsWithFailover[workerIdAddr] = []*schedulerx.MasterStartContainerRequest{request}
			} else {
				worker2ReqsWithFailover[workerIdAddr] = append(worker2ReqsWithFailover[workerIdAddr], request)
			}
		} else {
			if _, ok := worker2ReqsWithNormal[workerIdAddr]; !ok {
				worker2ReqsWithNormal[workerIdAddr] = []*schedulerx.MasterStartContainerRequest{request}
			} else {
				worker2ReqsWithNormal[workerIdAddr] = append(worker2ReqsWithNormal[workerIdAddr], request)
			}

			// The subtasks of failover do not need to be counted anymore
			if val, ok := m.taskProgressMap.Load(request.GetTaskName()); ok {
				val.(*common.TaskProgressCounter).IncrementOnePulled()
			}
		}

		if _, ok := m.workerProgressMap.Load(workerAddr); workerAddr != "" && !ok {
			m.workerProgressMap.LoadOrStore(workerAddr, common.NewWorkerProgressCounter(workerAddr))
		}
		if val, ok := m.workerProgressMap.Load(workerAddr); ok {
			val.(*common.WorkerProgressCounter).IncrementTotal()
			val.(*common.WorkerProgressCounter).IncrementPulled()
		}
	}
	return worker2ReqsWithNormal, worker2ReqsWithFailover
}