in internal/actor/job_instance_actor.go [264:300]
func (a *jobInstanceActor) handleRetryTasks(actorCtx actor.Context, msg *actorcomm.SchedulerWrappedMsg) {
req := msg.Msg.(*schedulerx.ServerRetryTasksRequest)
logger.Infof("handleRetryTasks, jobInstanceId=%d", req.GetJobInstanceId())
var (
jobInstanceInfo = convertServerRetryTasksRequest2JobInstanceInfo(req)
)
if taskMaster := a.taskmasterPool.Get(jobInstanceInfo.GetJobInstanceId()); taskMaster != nil {
if parallelTaskMaster, ok := taskMaster.(taskmaster.ParallelTaskMaster); ok {
parallelTaskMaster.RetryTasks(req.GetRetryTaskEntity())
resp := &schedulerx.ServerRetryTasksResponse{
Success: proto.Bool(true),
}
actorCtx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), actorcomm.WrapSchedulerxMsg(msg.Ctx, resp, msg.SenderPath))
}
} else {
var newTaskMaster taskmaster.TaskMaster
switch common.ExecuteMode(jobInstanceInfo.GetExecuteMode()) {
case common.StandaloneExecuteMode:
newTaskMaster = master.NewStandaloneTaskMaster(jobInstanceInfo, actorCtx)
case common.BroadcastExecuteMode:
newTaskMaster = master.NewBroadcastTaskMaster(jobInstanceInfo, actorCtx)
case common.BatchExecuteMode:
newTaskMaster = master.NewBatchTaskMaster(jobInstanceInfo, actorCtx)
case common.ParallelExecuteMode:
newTaskMaster = master.NewParallelTaskMaster(jobInstanceInfo, actorCtx)
case common.GridExecuteMode:
newTaskMaster = master.NewGridTaskMaster(jobInstanceInfo, actorCtx)
case common.ShardingExecuteMode:
taskMaster = master.NewShardingTaskMaster(jobInstanceInfo, actorCtx)
default:
logger.Errorf("handleRetryTasks failed, jobInstanceId=%d, unknown executeMode=%s", jobInstanceInfo.GetExecuteMode())
}
if newTaskMaster != nil {
masterpool.GetTaskMasterPool().Put(jobInstanceInfo.GetJobInstanceId(), newTaskMaster)
}
}
}