in internal/master/standalone_task_master.go [82:145]
func (m *StandaloneTaskMaster) SubmitInstance(ctx context.Context, jobInstanceInfo *common.JobInstanceInfo) error {
var (
err error
uniqueId string
taskId int64
workerId = utils.GetWorkerId()
workerAddr = m.GetCurrentSelection()
)
defer func() {
if err != nil {
logger.Errorf("Standalone taskMaster submitInstance failed, workerAddr=%s, uniqueId=%s, err=%s", workerAddr, uniqueId, err.Error())
m.taskStatusMap.Store(uniqueId, taskstatus.TaskStatusFailed)
failedReq := &schedulerx.ContainerReportTaskStatusRequest{
JobId: proto.Int64(jobInstanceInfo.GetJobId()),
JobInstanceId: proto.Int64(jobInstanceInfo.GetJobInstanceId()),
TaskId: proto.Int64(taskId),
Status: proto.Int32(int32(taskstatus.TaskStatusFailed)),
WorkerId: proto.String(workerId),
WorkerAddr: proto.String(workerAddr),
SerialNum: proto.Int64(m.serialNum.Load()),
}
m.UpdateTaskStatus(failedReq)
}
}()
taskId = m.AcquireTaskId()
uniqueId = utils.GetUniqueId(jobInstanceInfo.GetJobId(), jobInstanceInfo.GetJobInstanceId(), taskId)
req, err := m.convert2StartContainerRequest(jobInstanceInfo, taskId, "", nil, false)
if err != nil {
logger.Errorf("SubmitInstance failed, jobInstanceInfo=%+v, err=%s.", jobInstanceInfo, err.Error())
m.taskStatusMap.Store(uniqueId, taskstatus.TaskStatusFailed)
return err
}
// If task execution distribution is turned on for second-level tasks, the execution machine will be selected in polling.
if config.GetWorkerConfig().IsDispatchSecondDelayStandalone() && common.TimeType(jobInstanceInfo.GetTimeType()) == common.TimeTypeSecondDelay {
workerIdAddr := m.selectWorker()
workerInfo := strings.Split(workerIdAddr, "@")
workerId = workerInfo[0]
workerAddr = actorcomm.GetRealWorkerAddr(workerIdAddr)
m.currentSelection = workerAddr
}
response, e := m.actorContext.RequestFuture(actorcomm.GetContainerRouterPid(m.currentSelection), req, 10*time.Second).Result()
if e != nil {
err = fmt.Errorf("request to containerPid failed, err=%s", e.Error())
return err
}
resp, ok := response.(*schedulerx.MasterStartContainerResponse)
if !ok {
m.taskStatusMap.Store(uniqueId, taskstatus.TaskStatusFailed)
err = fmt.Errorf("response is not MasterStartContainerResponse, resp=%+v", response)
return err
}
if resp.GetSuccess() {
m.taskStatusMap.Store(uniqueId, taskstatus.TaskStatusInit)
logger.Infof("Standalone taskMaster init worker succeed, workerAddr=%s, uniqueId=%s", workerAddr, uniqueId)
return nil
}
err = fmt.Errorf("start container request failed: %s", resp.GetMessage())
return err
}