func()

in internal/master/parallel_task_mater.go [114:181]


func (m *ParallelTaskMaster) RetryTasks(taskEntities []*schedulerx.RetryTaskEntity) {
	// update tasks' status to INIT
	taskIdList := make([]int64, 0, len(taskEntities))
	for _, taskEntity := range taskEntities {
		if utils.IsRootTask(taskEntity.GetTaskName()) {
			logger.Warnf("root task can't retry")
		} else {
			taskIdList = append(taskIdList, taskEntity.GetTaskId())
		}
	}
	req := &schedulerx.WorkerReportTaskListStatusRequest{
		JobInstanceId: proto.Int64(m.jobInstanceInfo.GetJobInstanceId()),
		TaskId:        taskIdList,
		Status:        proto.Int32(int32(taskstatus.TaskStatusInit)),
	}

	// Send to server by master
	actorcomm.TaskMasterMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
		Msg: req,
	}

	// Wait 30 seconds
	timer := time.NewTimer(30 * time.Second)
	defer timer.Stop()
	select {
	case resp := <-actorcomm.WorkerBatchUpdateTaskStatusRespMsgSender():
		if resp.GetSuccess() {
			if !m.IsInited() {
				// If it has not been initialized, re-initialize it.
				m.startBatchHandler()
				m.init()
				for _, taskEntity := range taskEntities {
					taskName := taskEntity.GetTaskName()
					if counter, ok := m.taskProgressMap.LoadOrStore(taskName, common.NewTaskProgressCounter(taskName)); ok {
						counter.(*common.TaskProgressCounter).IncrementOneTotal()
					}
				}
			} else {
				for _, taskEntity := range taskEntities {
					taskName := taskEntity.GetTaskName()
					workerAddr := taskEntity.GetWorkerAddr()
					oldStatus := taskEntity.GetOldStatus()
					if taskProgressCounter, ok := m.taskProgressMap.Load(taskName); ok {
						switch taskstatus.TaskStatus(oldStatus) {
						case taskstatus.TaskStatusSucceed:
							taskProgressCounter.(*common.TaskProgressCounter).DecrementSuccess()
						case taskstatus.TaskStatusFailed:
							taskProgressCounter.(*common.TaskProgressCounter).DecrementFailed()
						}
					}
					if workerProgressCounter, ok := m.workerProgressMap.Load(workerAddr); ok {
						switch taskstatus.TaskStatus(oldStatus) {
						case taskstatus.TaskStatusSucceed:
							workerProgressCounter.(*common.WorkerProgressCounter).DecrementSuccess()
						case taskstatus.TaskStatusFailed:
							workerProgressCounter.(*common.WorkerProgressCounter).DecrementFailed()
						}
					}
				}
			}
		} else {
			logger.Errorf("RetryTasks in ParallelTaskMaster timeout, jobInstanceId=%d, errMsg=%s", m.jobInstanceInfo.GetJobInstanceId(), resp.GetMessage())
			//TODO 发送失败应该尝试另一个server
		}
	case <-timer.C:
		logger.Errorf("RetryTasks in ParallelTaskMaster timeout, jobInstanceId=%d", m.jobInstanceInfo.GetJobInstanceId())
	}
}