in internal/master/persistence/server_task_persistence.go [157:215]
func (rcvr *ServerTaskPersistence) CreateTasks(containers []*schedulerx.MasterStartContainerRequest, workerId string, workerAddr string) error {
if len(containers) == 0 {
return fmt.Errorf("createTasks container list empty")
}
jobInstanceId := containers[0].GetJobInstanceId()
batchReqs := new(schedulerx.WorkerBatchCreateTasksRequest)
isAdvancedVersion := rcvr.groupManager.IsAdvancedVersion(rcvr.groupId)
for _, taskInfo := range containers {
req := &schedulerx.WorkerCreateTaskRequest{
JobId: proto.Int64(taskInfo.GetJobId()),
JobInstanceId: proto.Int64(taskInfo.GetJobInstanceId()),
TaskId: proto.Int64(taskInfo.GetTaskId()),
TaskName: proto.String(taskInfo.GetTaskName()),
TaskBody: taskInfo.GetTask(),
}
task := new(interface{})
if err := json.Unmarshal(taskInfo.GetTask(), task); err != nil {
return fmt.Errorf("json unmarshal TaskBody failed, err=%s", err.Error())
}
bisSubTask, ok := (*task).(bizsubtask.BizSubTask)
if isAdvancedVersion && ok {
tmp, err := json.Marshal(bisSubTask.LabelMap())
if err != nil {
return fmt.Errorf("json marshal TaskBody's labelMap failed, err=%s", err.Error())
}
req.LabelMap = proto.String(string(tmp))
}
batchReqs.Task = append(batchReqs.Task, req)
}
batchReqs.JobInstanceId = proto.Int64(jobInstanceId)
batchReqs.WorkerId = proto.String(workerId)
batchReqs.WorkerAddr = proto.String(workerAddr)
// Send to server by master
actorcomm.TaskMasterMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
Msg: batchReqs,
}
// Wait 90 seconds
timer := time.NewTimer(90 * time.Second)
defer timer.Stop()
select {
case resp := <-actorcomm.WorkerBatchCreateTasksRespMsgSender():
if resp.GetSuccess() {
logger.Infof("batch create tasks to server succeed, JobInstanceId=%d, size=%d", jobInstanceId, len(containers))
} else {
errMsg := fmt.Sprintf("batch create tasks error, JobInstanceId=%d, reason=%s.", jobInstanceId, resp.GetMessage())
logger.Errorf(errMsg)
return fmt.Errorf(errMsg)
}
case <-timer.C:
logger.Errorf("ClearTasks of JobInstanceId=%d in ServerTaskPersistence timeout", jobInstanceId)
}
return nil
}