func()

in internal/master/sharding_task_master.go [145:191]


func (m *ShardingTaskMaster) BatchHandlePulledProgress(masterStartContainerRequests []*schedulerx.MasterStartContainerRequest,
	remoteWorker string) (map[string][]*schedulerx.MasterStartContainerRequest, map[string][]*schedulerx.MasterStartContainerRequest) {
	var (
		worker2ReqsWithNormal   = make(map[string][]*schedulerx.MasterStartContainerRequest)
		worker2ReqsWithFailover = make(map[string][]*schedulerx.MasterStartContainerRequest)
	)
	for _, request := range masterStartContainerRequests {
		workerIdAddr := remoteWorker
		if workerIdAddr == "" {
			workerIdAddr = m.selectWorker()
		}
		if workerIdAddr == "" {
			m.updateNewInstanceStatus(m.GetSerialNum(), m.GetJobInstanceInfo().GetJobInstanceId(), processor.InstanceStatusFailed, "all worker is down!")
			break
		}

		workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr)
		if request.GetFailover() {
			if _, ok := worker2ReqsWithFailover[workerIdAddr]; !ok {
				worker2ReqsWithFailover[workerIdAddr] = []*schedulerx.MasterStartContainerRequest{request}
			} else {
				worker2ReqsWithFailover[workerIdAddr] = append(worker2ReqsWithFailover[workerIdAddr], request)
			}
		} else {
			if _, ok := worker2ReqsWithNormal[workerIdAddr]; !ok {
				worker2ReqsWithNormal[workerIdAddr] = []*schedulerx.MasterStartContainerRequest{request}
			} else {
				worker2ReqsWithNormal[workerIdAddr] = append(worker2ReqsWithFailover[workerIdAddr], request)
			}

			if counter, ok := m.taskProgressMap.Load(request.GetTaskName()); ok {
				counter.(*common.TaskProgressCounter).IncrementOnePulled()
			}
		}

		if _, ok := m.workerProgressMap.Load(workerAddr); workerAddr != "" && !ok {
			m.workerProgressMap.LoadOrStore(workerAddr, common.NewWorkerProgressCounter(workerAddr))
		}
		if val, ok := m.workerProgressMap.Load(workerAddr); ok {
			val.(*common.WorkerProgressCounter).IncrementTotal()
			val.(*common.WorkerProgressCounter).IncrementPulled()
		}

		m.shardingTaskStatusMap.Store(request.GetTaskId(), taskstatus.NewShardingTaskStatus(request.GetTaskId(), workerAddr, int32(taskstatus.TaskStatusInit)))
	}
	return worker2ReqsWithNormal, worker2ReqsWithFailover
}