func()

in internal/actor/job_instance_actor.go [264:300]


func (a *jobInstanceActor) handleRetryTasks(actorCtx actor.Context, msg *actorcomm.SchedulerWrappedMsg) {
	req := msg.Msg.(*schedulerx.ServerRetryTasksRequest)
	logger.Infof("handleRetryTasks, jobInstanceId=%d", req.GetJobInstanceId())
	var (
		jobInstanceInfo = convertServerRetryTasksRequest2JobInstanceInfo(req)
	)
	if taskMaster := a.taskmasterPool.Get(jobInstanceInfo.GetJobInstanceId()); taskMaster != nil {
		if parallelTaskMaster, ok := taskMaster.(taskmaster.ParallelTaskMaster); ok {
			parallelTaskMaster.RetryTasks(req.GetRetryTaskEntity())
			resp := &schedulerx.ServerRetryTasksResponse{
				Success: proto.Bool(true),
			}
			actorCtx.Send(actorcomm.SchedulerxServerPid(msg.Ctx), actorcomm.WrapSchedulerxMsg(msg.Ctx, resp, msg.SenderPath))
		}
	} else {
		var newTaskMaster taskmaster.TaskMaster
		switch common.ExecuteMode(jobInstanceInfo.GetExecuteMode()) {
		case common.StandaloneExecuteMode:
			newTaskMaster = master.NewStandaloneTaskMaster(jobInstanceInfo, actorCtx)
		case common.BroadcastExecuteMode:
			newTaskMaster = master.NewBroadcastTaskMaster(jobInstanceInfo, actorCtx)
		case common.BatchExecuteMode:
			newTaskMaster = master.NewBatchTaskMaster(jobInstanceInfo, actorCtx)
		case common.ParallelExecuteMode:
			newTaskMaster = master.NewParallelTaskMaster(jobInstanceInfo, actorCtx)
		case common.GridExecuteMode:
			newTaskMaster = master.NewGridTaskMaster(jobInstanceInfo, actorCtx)
		case common.ShardingExecuteMode:
			taskMaster = master.NewShardingTaskMaster(jobInstanceInfo, actorCtx)
		default:
			logger.Errorf("handleRetryTasks failed, jobInstanceId=%d, unknown executeMode=%s", jobInstanceInfo.GetExecuteMode())
		}
		if newTaskMaster != nil {
			masterpool.GetTaskMasterPool().Put(jobInstanceInfo.GetJobInstanceId(), newTaskMaster)
		}
	}
}