in runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/SimulationScheduler.java [341:405]
public synchronized void onTaskStateReportFromExecutor(final String executorId,
final String taskId,
final int attemptIdx,
final TaskState.State newState,
@Nullable final String taskPutOnHold,
final TaskState.RecoverableTaskFailureCause failureCause) {
// Role of MasterControlMessageReceiver + handleControlMessage --> onTaskStateChanged.
// Do change state, as this notification is for the current task attempt.
planStateManager.onTaskStateChanged(taskId, newState);
switch (newState) {
case COMPLETE:
BatchSchedulerUtils.onTaskExecutionComplete(executorRegistry, executorId, taskId);
break;
case SHOULD_RETRY:
// SHOULD_RETRY from an executor means that the task ran into a recoverable failure
BatchSchedulerUtils.onTaskExecutionFailedRecoverable(planStateManager, blockManagerMaster, executorRegistry,
executorId, taskId, failureCause);
break;
case ON_HOLD:
final java.util.Optional<PhysicalPlan> optionalPhysicalPlan =
BatchSchedulerUtils
.onTaskExecutionOnHold(planStateManager, executorRegistry, planRewriter, executorId, taskId);
optionalPhysicalPlan.ifPresent(this::updatePlan);
break;
case FAILED:
throw new UnrecoverableFailureException(new Exception(String.format("The plan failed on %s in %s",
taskId, executorId)));
case READY:
case EXECUTING:
throw new SimulationException("The states READY/EXECUTING cannot occur at this point");
default:
throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + newState));
}
// Invoke doSchedule()
switch (newState) {
case COMPLETE:
case ON_HOLD:
// If the stage has completed
final String stageIdForTaskUponCompletion = RuntimeIdManager.getStageIdFromTaskId(taskId);
if (planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE)
&& !planStateManager.isPlanDone()) {
doSchedule();
}
break;
case SHOULD_RETRY:
// Do retry
doSchedule();
break;
default:
break;
}
// Invoke taskDispatcher.onExecutorSlotAvailable()
switch (newState) {
// These three states mean that a slot is made available.
case COMPLETE:
case ON_HOLD:
case SHOULD_RETRY:
taskDispatcher.onExecutorSlotAvailable();
break;
default:
break;
}
}