in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/enumeration/StageAssignmentTraversal.java [391:459]
private void applySplittingCriteria(InterimStage stage) {
// TODO: This splitting mechanism can cause unnecessary fragmentation of stages. Most likely, because "willTaskBeSeparated" depends on the traversal order of the stage DAG.
// Keeps track of ExecutionTasks that should be separated from those that are not in this Set.
Set<ExecutionTask> tasksToSeparate = new HashSet<>();
// Maintains ExecutionTasks whose outgoing Channels have been visited.
Set<ExecutionTask> seenTasks = new HashSet<>();
// Maintains ExecutionTasks to be visited and checked for split criteria.
Queue<ExecutionTask> taskQueue = new LinkedList<>(stage.getStartTasks());
while (!taskQueue.isEmpty()) {
final ExecutionTask task = taskQueue.poll();
// Avoid visiting the task twice.
if (seenTasks.add(task)) {
// Check if the task is already marked for separation.
boolean willTaskBeSeparated = tasksToSeparate.contains(task);
// Visit all successor tasks and check whether they should be separated.
for (Channel channel : task.getOutputChannels()) {
for (ExecutionTask consumerTask : channel.getConsumers()) {
// If the consumerTask is in other stage, there is no need to split.
if (this.assignedInterimStages.get(consumerTask) != stage) {
continue;
}
if (willTaskBeSeparated || this.splittingCriteria.stream().anyMatch(
criterion -> criterion.shouldSplit(task, channel, consumerTask)
)) {
if (consumerTask.isFeedbackInput(channel)) {
// TODO: Use marks to implement same-stage splits.
// channel.setStageExecutionBarrier(true);
continue;
}
tasksToSeparate.add(consumerTask);
}
taskQueue.add(consumerTask);
}
}
}
}
if (!tasksToSeparate.isEmpty()) {
assert tasksToSeparate.size() < stage.getTasks().size() : String.format(
"Cannot separate all tasks from stage with tasks %s.", tasksToSeparate
);
// Prepare to split the ExecutionTasks that are not separated.
final HashSet<ExecutionTask> tasksToKeep = new HashSet<>(stage.getTasks());
tasksToKeep.removeAll(tasksToSeparate);
// Separate the ExecutionTasks and create stages for each connected component.
do {
Set<ExecutionTask> component = this.separateConnectedComponent(tasksToSeparate);
final InterimStage separatedStage = this.splitStage(stage, component);
this.applySplittingCriteria(separatedStage);
} while (!tasksToSeparate.isEmpty());
// Also split the remainder into connected components.
while (true) {
Set<ExecutionTask> component = this.separateConnectedComponent(tasksToKeep);
// Avoid "splitting" if the tasksToKeep are already a connected component.
if (tasksToKeep.isEmpty()) break;
final InterimStage separatedStage = this.splitStage(stage, component);
this.applySplittingCriteria(separatedStage);
}
}
}