public synchronized void onTaskStateChanged()

in runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java [311:381]


  public synchronized void onTaskStateChanged(final String taskId, final TaskState.State newTaskState) {
    // Change task state
    final StateMachine taskState = getTaskStateHelper(taskId).getStateMachine();
    LOG.debug("Task State Transition: id {}, from {} to {}",
      new Object[]{taskId, taskState.getCurrentState(), newTaskState});
    metricStore.getOrCreateMetric(TaskMetric.class, taskId)
      .addEvent((TaskState.State) taskState.getCurrentState(), newTaskState);
    metricStore.triggerBroadcast(TaskMetric.class, taskId);

    try {
      taskState.setState(newTaskState);
    } catch (IllegalStateTransitionException e) {
      throw new RuntimeException(taskId + " - Illegal task state transition ", e);
    }

    // Log not-yet-completed tasks for us humans to track progress
    final String stageId = RuntimeIdManager.getStageIdFromTaskId(taskId);
    final Map<Integer, List<TaskState>> taskStatesOfThisStage = stageIdToTaskIdxToAttemptStates.get(stageId);
    final long numOfCompletedTaskIndicesInThisStage = taskStatesOfThisStage.values().stream()
      .filter(attempts -> {
        final List<TaskState.State> states = attempts
          .stream()
          .map(state -> (TaskState.State) state.getStateMachine().getCurrentState())
          .collect(Collectors.toList());
        return states.stream().anyMatch(curState -> curState.equals(TaskState.State.ON_HOLD)) // one of them is ON_HOLD
          || states.stream().anyMatch(curState -> curState.equals(TaskState.State.COMPLETE)); // one of them is COMPLETE
      })
      .count();
    if (newTaskState.equals(TaskState.State.COMPLETE)) {
      LOG.info("{} completed: {} Task(s) out of {} are remaining in this stage",
        taskId, taskStatesOfThisStage.size() - numOfCompletedTaskIndicesInThisStage, taskStatesOfThisStage.size());
    }

    // Maintain info for speculative execution
    if (newTaskState.equals(TaskState.State.EXECUTING)) {
      taskIdToStartTimeMs.put(taskId, System.currentTimeMillis());
    } else if (newTaskState.equals(TaskState.State.COMPLETE)) {
      stageIdToCompletedTaskTimeMsList.putIfAbsent(stageId, new ArrayList<>());
      stageIdToCompletedTaskTimeMsList.get(stageId).add(System.currentTimeMillis() - taskIdToStartTimeMs.get(taskId));
    }

    // Change stage state, if needed
    switch (newTaskState) {
      // INCOMPLETE stage
      case SHOULD_RETRY:
        final boolean isAPeerAttemptCompleted = getPeerAttemptsForTheSameTaskIndex(taskId).stream()
          .anyMatch(state -> state.equals(TaskState.State.COMPLETE));
        if (!isAPeerAttemptCompleted) {
          // None of the peers has completed, hence this stage is incomplete
          onStageStateChanged(stageId, StageState.State.INCOMPLETE);
        }
        break;

      // COMPLETE stage
      case COMPLETE:
      case ON_HOLD:
        if (numOfCompletedTaskIndicesInThisStage
          == physicalPlan.getStageDAG().getVertexById(stageId).getTaskIndices().size()) {
          onStageStateChanged(stageId, StageState.State.COMPLETE);
        }
        break;

      // Doesn't affect StageState
      case READY:
      case EXECUTING:
      case FAILED:
        break;
      default:
        throw new UnknownExecutionStateException(new Throwable("This task state is unknown"));
    }
  }