func()

in internal/master/broadcast_task_master.go [121:186]


func (m *BroadcastTaskMaster) dispatchTask(jobInstanceInfo *common.JobInstanceInfo, workerIdAddr string, taskIdMap map[string]int64) {
	var (
		err        error
		workerAddr string
		uniqueId   string
		taskId     int64
		workerId   string
	)
	defer func() {
		if err != nil {
			logger.Errorf("broadcast taskMaster submitTask=%s to worker=%s error, errMsg=%s", uniqueId, workerAddr, err.Error())
			m.existInvalidWorker = true
			failedReq := &schedulerx.ContainerReportTaskStatusRequest{
				JobId:         proto.Int64(jobInstanceInfo.GetJobId()),
				JobInstanceId: proto.Int64(jobInstanceInfo.GetJobInstanceId()),
				TaskId:        proto.Int64(taskId),
				Status:        proto.Int32(int32(taskstatus.TaskStatusFailed)),
				WorkerId:      proto.String(workerId),
				WorkerAddr:    proto.String(workerAddr),
				SerialNum:     proto.Int64(m.serialNum.Load()),
			}
			m.UpdateTaskStatus(failedReq)
		}
	}()

	workerInfo := strings.Split(workerIdAddr, "@")
	workerId = workerInfo[0]
	workerAddr = actorcomm.GetRealWorkerAddr(workerIdAddr)

	taskId = taskIdMap[workerIdAddr]
	uniqueId = utils.GetUniqueId(jobInstanceInfo.GetJobId(), jobInstanceInfo.GetJobInstanceId(), taskId)
	req, e := m.convert2StartContainerRequest(jobInstanceInfo, taskId, "", nil, false)
	if e != nil {
		err = fmt.Errorf("convert2StartContainerRequest failed, jobInstanceInfo=%+v, taskId=%+v, err=%s", jobInstanceInfo, taskId, e.Error())
		return
	}
	req.ShardingNum = proto.Int32(int32(len(m.allWorkers)))

	m.taskIdStatusMap.Store(taskId, taskstatus.TaskStatusInit)
	maxRetryTimes := int(config.GetWorkerConfig().BroadcastDispatchRetryTimes())
	for retryTimes := 0; retryTimes < maxRetryTimes; retryTimes++ {
		response, e := m.actorContext.RequestFuture(actorcomm.GetContainerRouterPid(workerAddr), req, 5*time.Second).Result()
		if e != nil {
			err = fmt.Errorf("start container failed, worker=%v, uniqueId=%v, serialNum=%v, err=%v",
				workerAddr, m.GetSerialNum(), uniqueId, e.Error())
			continue
		}
		resp, ok := response.(*schedulerx.MasterStartContainerResponse)
		if !ok {
			err = fmt.Errorf("start container failed, worker=%v, uniqueId=%v, serialNum=%v, response is not MasterStartContainerResponse, resp=%+v",
				workerAddr, m.GetSerialNum(), uniqueId, response)
			continue
		}
		if resp.GetSuccess() {
			m.worker2uniqueIdMap.Store(workerIdAddr, uniqueId)
			logger.Infof("broadcast taskMaster init succeed, worker addr is %s, uniqueId=%s", workerIdAddr, uniqueId)
			return
		} else {
			err = fmt.Errorf("broadcast submitTask=%v serialNum=%v to worker=%v failed, err=%v",
				uniqueId, m.GetSerialNum(), workerAddr, resp.GetMessage())
			time.Sleep(2 * time.Millisecond)
			continue
		}
	}
	return
}