in runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java [311:381]
public synchronized void onTaskStateChanged(final String taskId, final TaskState.State newTaskState) {
// Change task state
final StateMachine taskState = getTaskStateHelper(taskId).getStateMachine();
LOG.debug("Task State Transition: id {}, from {} to {}",
new Object[]{taskId, taskState.getCurrentState(), newTaskState});
metricStore.getOrCreateMetric(TaskMetric.class, taskId)
.addEvent((TaskState.State) taskState.getCurrentState(), newTaskState);
metricStore.triggerBroadcast(TaskMetric.class, taskId);
try {
taskState.setState(newTaskState);
} catch (IllegalStateTransitionException e) {
throw new RuntimeException(taskId + " - Illegal task state transition ", e);
}
// Log not-yet-completed tasks for us humans to track progress
final String stageId = RuntimeIdManager.getStageIdFromTaskId(taskId);
final Map<Integer, List<TaskState>> taskStatesOfThisStage = stageIdToTaskIdxToAttemptStates.get(stageId);
final long numOfCompletedTaskIndicesInThisStage = taskStatesOfThisStage.values().stream()
.filter(attempts -> {
final List<TaskState.State> states = attempts
.stream()
.map(state -> (TaskState.State) state.getStateMachine().getCurrentState())
.collect(Collectors.toList());
return states.stream().anyMatch(curState -> curState.equals(TaskState.State.ON_HOLD)) // one of them is ON_HOLD
|| states.stream().anyMatch(curState -> curState.equals(TaskState.State.COMPLETE)); // one of them is COMPLETE
})
.count();
if (newTaskState.equals(TaskState.State.COMPLETE)) {
LOG.info("{} completed: {} Task(s) out of {} are remaining in this stage",
taskId, taskStatesOfThisStage.size() - numOfCompletedTaskIndicesInThisStage, taskStatesOfThisStage.size());
}
// Maintain info for speculative execution
if (newTaskState.equals(TaskState.State.EXECUTING)) {
taskIdToStartTimeMs.put(taskId, System.currentTimeMillis());
} else if (newTaskState.equals(TaskState.State.COMPLETE)) {
stageIdToCompletedTaskTimeMsList.putIfAbsent(stageId, new ArrayList<>());
stageIdToCompletedTaskTimeMsList.get(stageId).add(System.currentTimeMillis() - taskIdToStartTimeMs.get(taskId));
}
// Change stage state, if needed
switch (newTaskState) {
// INCOMPLETE stage
case SHOULD_RETRY:
final boolean isAPeerAttemptCompleted = getPeerAttemptsForTheSameTaskIndex(taskId).stream()
.anyMatch(state -> state.equals(TaskState.State.COMPLETE));
if (!isAPeerAttemptCompleted) {
// None of the peers has completed, hence this stage is incomplete
onStageStateChanged(stageId, StageState.State.INCOMPLETE);
}
break;
// COMPLETE stage
case COMPLETE:
case ON_HOLD:
if (numOfCompletedTaskIndicesInThisStage
== physicalPlan.getStageDAG().getVertexById(stageId).getTaskIndices().size()) {
onStageStateChanged(stageId, StageState.State.COMPLETE);
}
break;
// Doesn't affect StageState
case READY:
case EXECUTING:
case FAILED:
break;
default:
throw new UnknownExecutionStateException(new Throwable("This task state is unknown"));
}
}