func()

in internal/master/standalone_task_master.go [82:145]


func (m *StandaloneTaskMaster) SubmitInstance(ctx context.Context, jobInstanceInfo *common.JobInstanceInfo) error {
	var (
		err      error
		uniqueId string
		taskId   int64

		workerId   = utils.GetWorkerId()
		workerAddr = m.GetCurrentSelection()
	)
	defer func() {
		if err != nil {
			logger.Errorf("Standalone taskMaster submitInstance failed, workerAddr=%s, uniqueId=%s, err=%s", workerAddr, uniqueId, err.Error())
			m.taskStatusMap.Store(uniqueId, taskstatus.TaskStatusFailed)
			failedReq := &schedulerx.ContainerReportTaskStatusRequest{
				JobId:         proto.Int64(jobInstanceInfo.GetJobId()),
				JobInstanceId: proto.Int64(jobInstanceInfo.GetJobInstanceId()),
				TaskId:        proto.Int64(taskId),
				Status:        proto.Int32(int32(taskstatus.TaskStatusFailed)),
				WorkerId:      proto.String(workerId),
				WorkerAddr:    proto.String(workerAddr),
				SerialNum:     proto.Int64(m.serialNum.Load()),
			}
			m.UpdateTaskStatus(failedReq)
		}
	}()

	taskId = m.AcquireTaskId()
	uniqueId = utils.GetUniqueId(jobInstanceInfo.GetJobId(), jobInstanceInfo.GetJobInstanceId(), taskId)
	req, err := m.convert2StartContainerRequest(jobInstanceInfo, taskId, "", nil, false)
	if err != nil {
		logger.Errorf("SubmitInstance failed, jobInstanceInfo=%+v, err=%s.", jobInstanceInfo, err.Error())
		m.taskStatusMap.Store(uniqueId, taskstatus.TaskStatusFailed)
		return err
	}

	// If task execution distribution is turned on for second-level tasks, the execution machine will be selected in polling.
	if config.GetWorkerConfig().IsDispatchSecondDelayStandalone() && common.TimeType(jobInstanceInfo.GetTimeType()) == common.TimeTypeSecondDelay {
		workerIdAddr := m.selectWorker()
		workerInfo := strings.Split(workerIdAddr, "@")
		workerId = workerInfo[0]
		workerAddr = actorcomm.GetRealWorkerAddr(workerIdAddr)
		m.currentSelection = workerAddr
	}

	response, e := m.actorContext.RequestFuture(actorcomm.GetContainerRouterPid(m.currentSelection), req, 10*time.Second).Result()
	if e != nil {
		err = fmt.Errorf("request to containerPid failed, err=%s", e.Error())
		return err
	}
	resp, ok := response.(*schedulerx.MasterStartContainerResponse)
	if !ok {
		m.taskStatusMap.Store(uniqueId, taskstatus.TaskStatusFailed)
		err = fmt.Errorf("response is not MasterStartContainerResponse, resp=%+v", response)
		return err
	}
	if resp.GetSuccess() {
		m.taskStatusMap.Store(uniqueId, taskstatus.TaskStatusInit)
		logger.Infof("Standalone taskMaster init worker succeed, workerAddr=%s, uniqueId=%s", workerAddr, uniqueId)
		return nil
	}

	err = fmt.Errorf("start container request failed: %s", resp.GetMessage())
	return err
}