in runtime/master/src/main/java/org/apache/nemo/runtime/master/PlanStateManager.java [185:236]
public synchronized List<String> getTaskAttemptsToSchedule(final String stageId) {
if (getStageState(stageId).equals(StageState.State.COMPLETE)) {
// This stage is done
return new ArrayList<>(0);
}
// For each task index....
final List<String> taskAttemptsToSchedule = new ArrayList<>();
final Stage stage = physicalPlan.getStageDAG().getVertexById(stageId);
for (final int taskIndex : stage.getTaskIndices()) {
final List<TaskState> attemptStatesForThisTaskIndex =
stageIdToTaskIdxToAttemptStates.get(stageId).get(taskIndex);
// If one of the attempts is COMPLETE, do not schedule
if (attemptStatesForThisTaskIndex
.stream()
.noneMatch(state -> state.getStateMachine().getCurrentState().equals(TaskState.State.COMPLETE))) {
// (Step 1) Create new READY attempts, as many as
// # of numOfConcurrentAttempts(including clones) - # of 'not-done' attempts
stageIdToTaskIndexToNumOfClones.putIfAbsent(stageId, new HashMap<>());
final Optional<ClonedSchedulingProperty.CloneConf> cloneConf =
stage.getPropertyValue(ClonedSchedulingProperty.class);
final int numOfConcurrentAttempts = cloneConf.isPresent() && cloneConf.get().isUpFrontCloning()
// For now we support up to 1 clone (2 concurrent = 1 original + 1 clone)
? 2
// If the property is not set, then we do not clone (= 1 concurrent)
: stageIdToTaskIndexToNumOfClones.get(stageId).getOrDefault(stageId, 1);
final long numOfNotDoneAttempts = attemptStatesForThisTaskIndex.stream().filter(this::isTaskNotDone).count();
for (int i = 0; i < numOfConcurrentAttempts - numOfNotDoneAttempts; i++) {
attemptStatesForThisTaskIndex.add(new TaskState());
}
// (Step 2) Check max attempt
if (attemptStatesForThisTaskIndex.size() > maxScheduleAttempt) {
throw new RuntimeException(
attemptStatesForThisTaskIndex.size() + " exceeds max attempt " + maxScheduleAttempt);
}
// (Step 3) Return all READY attempts
for (int attempt = 0; attempt < attemptStatesForThisTaskIndex.size(); attempt++) {
if (attemptStatesForThisTaskIndex.get(attempt).getStateMachine().getCurrentState()
.equals(TaskState.State.READY)) {
taskAttemptsToSchedule.add(RuntimeIdManager.generateTaskId(stageId, taskIndex, attempt));
}
}
}
}
return taskAttemptsToSchedule;
}