func()

in internal/master/map_task_master.go [548:619]


func (m *MapTaskMaster) batchHandleContainers(workerIdAddr string, reqs []*schedulerx.MasterStartContainerRequest, isFailover bool, dispatchMode common.TaskDispatchMode) {
	parts := strings.Split(workerIdAddr, "@")
	workerId := parts[0]
	workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr)

	logger.Debugf("jobInstanceId=%d, batch dispatch, worker:%s, size:%d", m.GetJobInstanceInfo().GetJobInstanceId(), workerIdAddr, len(reqs))
	m.batchHandlePersistence(workerId, workerAddr, reqs, isFailover)
	if dispatchMode == common.TaskDispatchModePush {
		startTime := time.Now()
		// FIXME
		// workerAddr = actorcomm.GetRemoteWorkerAddr(workerAddr)

		containerRouterActorPid := actorcomm.GetContainerRouterPid(workerAddr)
		req := &schedulerx.MasterBatchStartContainersRequest{
			JobInstanceId: proto.Int64(m.GetJobInstanceInfo().GetJobInstanceId()),
			JobId:         proto.Int64(m.GetJobInstanceInfo().GetJobId()),
			StartReqs:     reqs,
		}

		future := m.actorCtx.RequestFuture(containerRouterActorPid, req, 15*time.Second)
		result, err := future.Result()
		if err == nil {
			// Trigger success callback
			resp := result.(*schedulerx.MasterBatchStartContainersResponse)
			if resp.GetSuccess() {
				logger.Infof("jobInstanceId=%d, batch start containers successfully, size:%d, worker=%s, cost=%dms",
					m.GetJobInstanceInfo().GetJobInstanceId(), len(reqs), workerIdAddr, time.Since(startTime).Milliseconds())
			} else {
				logger.Errorf("jobInstanceId=%d, batch start containers failed, worker=%s, response=%s, size:%d",
					m.GetJobInstanceInfo().GetJobInstanceId(), workerIdAddr, resp.GetMessage(), len(reqs))

				// TODO 发送失败应该尝试另一个worker还是直接置为失败?可能要根据返回值进行处理
				// Currently it is set to fail directly
				m.batchUpdateTaskStatus(workerId, workerAddr, reqs)
			}
		} else {
			// Trigger timeout or failure callback
			if errors.Is(err, actor.ErrTimeout) {
				if len(m.GetJobInstanceInfo().GetAllWorkers()) == 1 {
					logger.Errorf("jobInstanceId:%d, batch dispatch tasks failed due to only existed worker[%s] was down, size:%d, error=%s",
						m.GetJobInstanceInfo().GetJobInstanceId(), workerIdAddr, len(reqs), err.Error())
					m.batchUpdateTaskStatus(workerId, workerAddr, reqs)
					return
				}
				logger.Warnf("jobInstanceId=%d, worker[%s] is down, try another worker, size:%d",
					m.GetJobInstanceInfo().GetJobInstanceId(), workerIdAddr, len(reqs))

				// TODO: worker挂了,先移除该worker,再尝试发给另一个worker,这里暂时去掉,探活交给专门的线程去干这里不做判断;
				m.GetJobInstanceInfo().SetAllWorkers(utils.RemoveSliceElem(m.GetJobInstanceInfo().GetAllWorkers(), workerIdAddr))

				// Send timeout, fallback to init status
				var taskIds []int64
				for _, req := range reqs {
					taskIds = append(taskIds, req.GetTaskId())
				}

				affectCnt, err := m.taskPersistence.UpdateTaskStatus(m.GetJobInstanceInfo().GetJobInstanceId(), taskIds, taskstatus.TaskStatusInit, workerId, workerAddr)
				if err != nil {
					logger.Errorf("jobInstanceId=%d, timeout return init error", m.GetJobInstanceInfo().GetJobInstanceId())
					m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, "timeout dispatch return init error")
				}
				if val, ok := m.workerProgressMap.Load(workerAddr); ok {
					val.(*common.WorkerProgressCounter).DecrementRunning(affectCnt)
				}
			} else {
				// If there are other exceptions (such as serialization failure, worker cannot be found), directly set the task to failure.
				logger.Errorf("jobInstanceId:%d, batch dispatch Tasks error, worker=%s, size:%d, error=%s", m.GetJobInstanceInfo().GetJobInstanceId(), workerIdAddr, len(reqs), err.Error())
				m.batchUpdateTaskStatus(workerId, workerAddr, reqs)
			}
		}
	}
}