in internal/master/map_task_master.go [548:619]
func (m *MapTaskMaster) batchHandleContainers(workerIdAddr string, reqs []*schedulerx.MasterStartContainerRequest, isFailover bool, dispatchMode common.TaskDispatchMode) {
parts := strings.Split(workerIdAddr, "@")
workerId := parts[0]
workerAddr := actorcomm.GetRealWorkerAddr(workerIdAddr)
logger.Debugf("jobInstanceId=%d, batch dispatch, worker:%s, size:%d", m.GetJobInstanceInfo().GetJobInstanceId(), workerIdAddr, len(reqs))
m.batchHandlePersistence(workerId, workerAddr, reqs, isFailover)
if dispatchMode == common.TaskDispatchModePush {
startTime := time.Now()
// FIXME
// workerAddr = actorcomm.GetRemoteWorkerAddr(workerAddr)
containerRouterActorPid := actorcomm.GetContainerRouterPid(workerAddr)
req := &schedulerx.MasterBatchStartContainersRequest{
JobInstanceId: proto.Int64(m.GetJobInstanceInfo().GetJobInstanceId()),
JobId: proto.Int64(m.GetJobInstanceInfo().GetJobId()),
StartReqs: reqs,
}
future := m.actorCtx.RequestFuture(containerRouterActorPid, req, 15*time.Second)
result, err := future.Result()
if err == nil {
// Trigger success callback
resp := result.(*schedulerx.MasterBatchStartContainersResponse)
if resp.GetSuccess() {
logger.Infof("jobInstanceId=%d, batch start containers successfully, size:%d, worker=%s, cost=%dms",
m.GetJobInstanceInfo().GetJobInstanceId(), len(reqs), workerIdAddr, time.Since(startTime).Milliseconds())
} else {
logger.Errorf("jobInstanceId=%d, batch start containers failed, worker=%s, response=%s, size:%d",
m.GetJobInstanceInfo().GetJobInstanceId(), workerIdAddr, resp.GetMessage(), len(reqs))
// TODO 发送失败应该尝试另一个worker还是直接置为失败?可能要根据返回值进行处理
// Currently it is set to fail directly
m.batchUpdateTaskStatus(workerId, workerAddr, reqs)
}
} else {
// Trigger timeout or failure callback
if errors.Is(err, actor.ErrTimeout) {
if len(m.GetJobInstanceInfo().GetAllWorkers()) == 1 {
logger.Errorf("jobInstanceId:%d, batch dispatch tasks failed due to only existed worker[%s] was down, size:%d, error=%s",
m.GetJobInstanceInfo().GetJobInstanceId(), workerIdAddr, len(reqs), err.Error())
m.batchUpdateTaskStatus(workerId, workerAddr, reqs)
return
}
logger.Warnf("jobInstanceId=%d, worker[%s] is down, try another worker, size:%d",
m.GetJobInstanceInfo().GetJobInstanceId(), workerIdAddr, len(reqs))
// TODO: worker挂了,先移除该worker,再尝试发给另一个worker,这里暂时去掉,探活交给专门的线程去干这里不做判断;
m.GetJobInstanceInfo().SetAllWorkers(utils.RemoveSliceElem(m.GetJobInstanceInfo().GetAllWorkers(), workerIdAddr))
// Send timeout, fallback to init status
var taskIds []int64
for _, req := range reqs {
taskIds = append(taskIds, req.GetTaskId())
}
affectCnt, err := m.taskPersistence.UpdateTaskStatus(m.GetJobInstanceInfo().GetJobInstanceId(), taskIds, taskstatus.TaskStatusInit, workerId, workerAddr)
if err != nil {
logger.Errorf("jobInstanceId=%d, timeout return init error", m.GetJobInstanceInfo().GetJobInstanceId())
m.UpdateNewInstanceStatus(m.GetSerialNum(), processor.InstanceStatusFailed, "timeout dispatch return init error")
}
if val, ok := m.workerProgressMap.Load(workerAddr); ok {
val.(*common.WorkerProgressCounter).DecrementRunning(affectCnt)
}
} else {
// If there are other exceptions (such as serialization failure, worker cannot be found), directly set the task to failure.
logger.Errorf("jobInstanceId:%d, batch dispatch Tasks error, worker=%s, size:%d, error=%s", m.GetJobInstanceInfo().GetJobInstanceId(), workerIdAddr, len(reqs), err.Error())
m.batchUpdateTaskStatus(workerId, workerAddr, reqs)
}
}
}
}