in internal/master/broadcast_task_master.go [121:186]
func (m *BroadcastTaskMaster) dispatchTask(jobInstanceInfo *common.JobInstanceInfo, workerIdAddr string, taskIdMap map[string]int64) {
var (
err error
workerAddr string
uniqueId string
taskId int64
workerId string
)
defer func() {
if err != nil {
logger.Errorf("broadcast taskMaster submitTask=%s to worker=%s error, errMsg=%s", uniqueId, workerAddr, err.Error())
m.existInvalidWorker = true
failedReq := &schedulerx.ContainerReportTaskStatusRequest{
JobId: proto.Int64(jobInstanceInfo.GetJobId()),
JobInstanceId: proto.Int64(jobInstanceInfo.GetJobInstanceId()),
TaskId: proto.Int64(taskId),
Status: proto.Int32(int32(taskstatus.TaskStatusFailed)),
WorkerId: proto.String(workerId),
WorkerAddr: proto.String(workerAddr),
SerialNum: proto.Int64(m.serialNum.Load()),
}
m.UpdateTaskStatus(failedReq)
}
}()
workerInfo := strings.Split(workerIdAddr, "@")
workerId = workerInfo[0]
workerAddr = actorcomm.GetRealWorkerAddr(workerIdAddr)
taskId = taskIdMap[workerIdAddr]
uniqueId = utils.GetUniqueId(jobInstanceInfo.GetJobId(), jobInstanceInfo.GetJobInstanceId(), taskId)
req, e := m.convert2StartContainerRequest(jobInstanceInfo, taskId, "", nil, false)
if e != nil {
err = fmt.Errorf("convert2StartContainerRequest failed, jobInstanceInfo=%+v, taskId=%+v, err=%s", jobInstanceInfo, taskId, e.Error())
return
}
req.ShardingNum = proto.Int32(int32(len(m.allWorkers)))
m.taskIdStatusMap.Store(taskId, taskstatus.TaskStatusInit)
maxRetryTimes := int(config.GetWorkerConfig().BroadcastDispatchRetryTimes())
for retryTimes := 0; retryTimes < maxRetryTimes; retryTimes++ {
response, e := m.actorContext.RequestFuture(actorcomm.GetContainerRouterPid(workerAddr), req, 5*time.Second).Result()
if e != nil {
err = fmt.Errorf("start container failed, worker=%v, uniqueId=%v, serialNum=%v, err=%v",
workerAddr, m.GetSerialNum(), uniqueId, e.Error())
continue
}
resp, ok := response.(*schedulerx.MasterStartContainerResponse)
if !ok {
err = fmt.Errorf("start container failed, worker=%v, uniqueId=%v, serialNum=%v, response is not MasterStartContainerResponse, resp=%+v",
workerAddr, m.GetSerialNum(), uniqueId, response)
continue
}
if resp.GetSuccess() {
m.worker2uniqueIdMap.Store(workerIdAddr, uniqueId)
logger.Infof("broadcast taskMaster init succeed, worker addr is %s, uniqueId=%s", workerIdAddr, uniqueId)
return
} else {
err = fmt.Errorf("broadcast submitTask=%v serialNum=%v to worker=%v failed, err=%v",
uniqueId, m.GetSerialNum(), workerAddr, resp.GetMessage())
time.Sleep(2 * time.Millisecond)
continue
}
}
return
}