in internal/master/parallel_task_mater.go [114:181]
func (m *ParallelTaskMaster) RetryTasks(taskEntities []*schedulerx.RetryTaskEntity) {
// update tasks' status to INIT
taskIdList := make([]int64, 0, len(taskEntities))
for _, taskEntity := range taskEntities {
if utils.IsRootTask(taskEntity.GetTaskName()) {
logger.Warnf("root task can't retry")
} else {
taskIdList = append(taskIdList, taskEntity.GetTaskId())
}
}
req := &schedulerx.WorkerReportTaskListStatusRequest{
JobInstanceId: proto.Int64(m.jobInstanceInfo.GetJobInstanceId()),
TaskId: taskIdList,
Status: proto.Int32(int32(taskstatus.TaskStatusInit)),
}
// Send to server by master
actorcomm.TaskMasterMsgReceiver() <- &actorcomm.SchedulerWrappedMsg{
Msg: req,
}
// Wait 30 seconds
timer := time.NewTimer(30 * time.Second)
defer timer.Stop()
select {
case resp := <-actorcomm.WorkerBatchUpdateTaskStatusRespMsgSender():
if resp.GetSuccess() {
if !m.IsInited() {
// If it has not been initialized, re-initialize it.
m.startBatchHandler()
m.init()
for _, taskEntity := range taskEntities {
taskName := taskEntity.GetTaskName()
if counter, ok := m.taskProgressMap.LoadOrStore(taskName, common.NewTaskProgressCounter(taskName)); ok {
counter.(*common.TaskProgressCounter).IncrementOneTotal()
}
}
} else {
for _, taskEntity := range taskEntities {
taskName := taskEntity.GetTaskName()
workerAddr := taskEntity.GetWorkerAddr()
oldStatus := taskEntity.GetOldStatus()
if taskProgressCounter, ok := m.taskProgressMap.Load(taskName); ok {
switch taskstatus.TaskStatus(oldStatus) {
case taskstatus.TaskStatusSucceed:
taskProgressCounter.(*common.TaskProgressCounter).DecrementSuccess()
case taskstatus.TaskStatusFailed:
taskProgressCounter.(*common.TaskProgressCounter).DecrementFailed()
}
}
if workerProgressCounter, ok := m.workerProgressMap.Load(workerAddr); ok {
switch taskstatus.TaskStatus(oldStatus) {
case taskstatus.TaskStatusSucceed:
workerProgressCounter.(*common.WorkerProgressCounter).DecrementSuccess()
case taskstatus.TaskStatusFailed:
workerProgressCounter.(*common.WorkerProgressCounter).DecrementFailed()
}
}
}
}
} else {
logger.Errorf("RetryTasks in ParallelTaskMaster timeout, jobInstanceId=%d, errMsg=%s", m.jobInstanceInfo.GetJobInstanceId(), resp.GetMessage())
//TODO 发送失败应该尝试另一个server
}
case <-timer.C:
logger.Errorf("RetryTasks in ParallelTaskMaster timeout, jobInstanceId=%d", m.jobInstanceInfo.GetJobInstanceId())
}
}