private void applySplittingCriteria()

in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/enumeration/StageAssignmentTraversal.java [391:459]


    private void applySplittingCriteria(InterimStage stage) {
        // TODO: This splitting mechanism can cause unnecessary fragmentation of stages. Most likely, because "willTaskBeSeparated" depends on the traversal order of the stage DAG.

        // Keeps track of ExecutionTasks that should be separated from those that are not in this Set.
        Set<ExecutionTask> tasksToSeparate = new HashSet<>();

        // Maintains ExecutionTasks whose outgoing Channels have been visited.
        Set<ExecutionTask> seenTasks = new HashSet<>();

        // Maintains ExecutionTasks to be visited and checked for split criteria.
        Queue<ExecutionTask> taskQueue = new LinkedList<>(stage.getStartTasks());
        while (!taskQueue.isEmpty()) {
            final ExecutionTask task = taskQueue.poll();

            // Avoid visiting the task twice.
            if (seenTasks.add(task)) {

                // Check if the task is already marked for separation.
                boolean willTaskBeSeparated = tasksToSeparate.contains(task);

                // Visit all successor tasks and check whether they should be separated.
                for (Channel channel : task.getOutputChannels()) {
                    for (ExecutionTask consumerTask : channel.getConsumers()) {
                        // If the consumerTask is in other stage, there is no need to split.
                        if (this.assignedInterimStages.get(consumerTask) != stage) {
                            continue;
                        }

                        if (willTaskBeSeparated || this.splittingCriteria.stream().anyMatch(
                                criterion -> criterion.shouldSplit(task, channel, consumerTask)
                        )) {
                            if (consumerTask.isFeedbackInput(channel)) {
                                // TODO: Use marks to implement same-stage splits.
                                // channel.setStageExecutionBarrier(true);
                                continue;
                            }
                            tasksToSeparate.add(consumerTask);
                        }
                        taskQueue.add(consumerTask);
                    }
                }
            }
        }

        if (!tasksToSeparate.isEmpty()) {
            assert tasksToSeparate.size() < stage.getTasks().size() : String.format(
                    "Cannot separate all tasks from stage with tasks %s.", tasksToSeparate
            );
            // Prepare to split the ExecutionTasks that are not separated.
            final HashSet<ExecutionTask> tasksToKeep = new HashSet<>(stage.getTasks());
            tasksToKeep.removeAll(tasksToSeparate);

            // Separate the ExecutionTasks and create stages for each connected component.
            do {
                Set<ExecutionTask> component = this.separateConnectedComponent(tasksToSeparate);
                final InterimStage separatedStage = this.splitStage(stage, component);
                this.applySplittingCriteria(separatedStage);
            } while (!tasksToSeparate.isEmpty());

            // Also split the remainder into connected components.
            while (true) {
                Set<ExecutionTask> component = this.separateConnectedComponent(tasksToKeep);
                // Avoid "splitting" if the tasksToKeep are already a connected component.
                if (tasksToKeep.isEmpty()) break;
                final InterimStage separatedStage = this.splitStage(stage, component);
                this.applySplittingCriteria(separatedStage);
            }
        }
    }