in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/enumeration/StageAssignmentTraversal.java [591:632]
private boolean partitionStage(InterimStage stage) {
// Short-cut: if the stage has not been marked, its required stages did not change.
if (!stage.getAndResetSplitMark()) {
return false;
}
int minRequiredStages = -1;
final Collection<ExecutionTask> initialTasks = new LinkedList<>();
final Set<ExecutionTask> tasksToSeparate = new HashSet<>();
for (ExecutionTask task : stage.getTasks()) {
final Set<InterimStage> requiredStages = this.requiredStages.get(task);
if (minRequiredStages == -1 || requiredStages.size() < minRequiredStages) {
tasksToSeparate.addAll(initialTasks);
initialTasks.clear();
minRequiredStages = requiredStages.size();
}
(minRequiredStages == requiredStages.size() ? initialTasks : tasksToSeparate).add(task);
}
if (tasksToSeparate.isEmpty()) {
logger.debug("No separable tasks found in marked stage {}.", stage);
return false;
} else {
// Prepare to split the ExecutionTasks that are not separated.
final HashSet<ExecutionTask> tasksToKeep = new HashSet<>(stage.getTasks());
tasksToKeep.removeAll(tasksToSeparate);
// Separate the ExecutionTasks and create stages for each connected component.
do {
Set<ExecutionTask> component = this.separateConnectedComponent(tasksToSeparate);
this.splitStage(stage, component);
} while (!tasksToSeparate.isEmpty());
// Also split the remainder into connected components.
while (true) {
Set<ExecutionTask> component = this.separateConnectedComponent(tasksToKeep);
// Avoid "splitting" if the tasksToKeep are already a connected component.
if (tasksToKeep.isEmpty()) break;
this.splitStage(stage, component);
}
return true;
}
}