public synchronized List getTaskAttemptsToSchedule()

in runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java [185:236]


  public synchronized List<String> getTaskAttemptsToSchedule(final String stageId) {
    if (getStageState(stageId).equals(StageState.State.COMPLETE)) {
      // This stage is done
      return new ArrayList<>(0);
    }

    // For each task index....
    final List<String> taskAttemptsToSchedule = new ArrayList<>();
    final Stage stage = physicalPlan.getStageDAG().getVertexById(stageId);
    for (final int taskIndex : stage.getTaskIndices()) {
      final List<TaskState> attemptStatesForThisTaskIndex =
        stageIdToTaskIdxToAttemptStates.get(stageId).get(taskIndex);

      // If one of the attempts is COMPLETE, do not schedule
      if (attemptStatesForThisTaskIndex
        .stream()
        .noneMatch(state -> state.getStateMachine().getCurrentState().equals(TaskState.State.COMPLETE))) {

        // (Step 1) Create new READY attempts, as many as
        // # of numOfConcurrentAttempts(including clones) - # of 'not-done' attempts
        stageIdToTaskIndexToNumOfClones.putIfAbsent(stageId, new HashMap<>());
        final Optional<ClonedSchedulingProperty.CloneConf> cloneConf =
          stage.getPropertyValue(ClonedSchedulingProperty.class);
        final int numOfConcurrentAttempts = cloneConf.isPresent() && cloneConf.get().isUpFrontCloning()
          // For now we support up to 1 clone (2 concurrent = 1 original + 1 clone)
          ? 2
          // If the property is not set, then we do not clone (= 1 concurrent)
          : stageIdToTaskIndexToNumOfClones.get(stageId).getOrDefault(stageId, 1);
        final long numOfNotDoneAttempts = attemptStatesForThisTaskIndex.stream().filter(this::isTaskNotDone).count();
        for (int i = 0; i < numOfConcurrentAttempts - numOfNotDoneAttempts; i++) {
          attemptStatesForThisTaskIndex.add(new TaskState());
        }

        // (Step 2) Check max attempt
        if (attemptStatesForThisTaskIndex.size() > maxScheduleAttempt) {
          throw new RuntimeException(
            attemptStatesForThisTaskIndex.size() + " exceeds max attempt " + maxScheduleAttempt);
        }

        // (Step 3) Return all READY attempts
        for (int attempt = 0; attempt < attemptStatesForThisTaskIndex.size(); attempt++) {
          if (attemptStatesForThisTaskIndex.get(attempt).getStateMachine().getCurrentState()
            .equals(TaskState.State.READY)) {
            taskAttemptsToSchedule.add(RuntimeIdManager.generateTaskId(stageId, taskIndex, attempt));
          }
        }

      }
    }

    return taskAttemptsToSchedule;
  }