func()

in internal/master/persistence/server_task_persistence.go [157:215]


func (rcvr *ServerTaskPersistence) CreateTasks(containers []*schedulerx.MasterStartContainerRequest, workerId string, workerAddr string) error {
	if len(containers) == 0 {
		return fmt.Errorf("createTasks container list empty")
	}
	jobInstanceId := containers[0].GetJobInstanceId()
	batchReqs := new(schedulerx.WorkerBatchCreateTasksRequest)

	isAdvancedVersion := rcvr.groupManager.IsAdvancedVersion(rcvr.groupId)
	for _, taskInfo := range containers {
		req := &schedulerx.WorkerCreateTaskRequest{
			JobId:         proto.Int64(taskInfo.GetJobId()),
			JobInstanceId: proto.Int64(taskInfo.GetJobInstanceId()),
			TaskId:        proto.Int64(taskInfo.GetTaskId()),
			TaskName:      proto.String(taskInfo.GetTaskName()),
			TaskBody:      taskInfo.GetTask(),
		}

		task := new(interface{})
		if err := json.Unmarshal(taskInfo.GetTask(), task); err != nil {
			return fmt.Errorf("json unmarshal TaskBody failed, err=%s", err.Error())
		}

		bisSubTask, ok := (*task).(bizsubtask.BizSubTask)
		if isAdvancedVersion && ok {
			tmp, err := json.Marshal(bisSubTask.LabelMap())
			if err != nil {
				return fmt.Errorf("json marshal TaskBody's labelMap failed, err=%s", err.Error())
			}
			req.LabelMap = proto.String(string(tmp))
		}
		batchReqs.Task = append(batchReqs.Task, req)
	}
	batchReqs.JobInstanceId = proto.Int64(jobInstanceId)
	batchReqs.WorkerId = proto.String(workerId)
	batchReqs.WorkerAddr = proto.String(workerAddr)

	// Send to server by master
	actorcomm.TaskMasterMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
		Msg: batchReqs,
	}

	// Wait 90 seconds
	timer := time.NewTimer(90 * time.Second)
	defer timer.Stop()
	select {
	case resp := <-actorcomm.WorkerBatchCreateTasksRespMsgSender():
		if resp.GetSuccess() {
			logger.Infof("batch create tasks to server succeed, JobInstanceId=%d, size=%d", jobInstanceId, len(containers))
		} else {
			errMsg := fmt.Sprintf("batch create tasks error, JobInstanceId=%d, reason=%s.", jobInstanceId, resp.GetMessage())
			logger.Errorf(errMsg)
			return fmt.Errorf(errMsg)
		}
	case <-timer.C:
		logger.Errorf("ClearTasks of JobInstanceId=%d in ServerTaskPersistence timeout", jobInstanceId)
	}

	return nil
}