private boolean partitionStage()

in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/enumeration/StageAssignmentTraversal.java [591:632]


    private boolean partitionStage(InterimStage stage) {
        // Short-cut: if the stage has not been marked, its required stages did not change.
        if (!stage.getAndResetSplitMark()) {
            return false;
        }
        int minRequiredStages = -1;
        final Collection<ExecutionTask> initialTasks = new LinkedList<>();
        final Set<ExecutionTask> tasksToSeparate = new HashSet<>();
        for (ExecutionTask task : stage.getTasks()) {
            final Set<InterimStage> requiredStages = this.requiredStages.get(task);
            if (minRequiredStages == -1 || requiredStages.size() < minRequiredStages) {
                tasksToSeparate.addAll(initialTasks);
                initialTasks.clear();
                minRequiredStages = requiredStages.size();
            }
            (minRequiredStages == requiredStages.size() ? initialTasks : tasksToSeparate).add(task);
        }

        if (tasksToSeparate.isEmpty()) {
            logger.debug("No separable tasks found in marked stage {}.", stage);
            return false;
        } else {
            // Prepare to split the ExecutionTasks that are not separated.
            final HashSet<ExecutionTask> tasksToKeep = new HashSet<>(stage.getTasks());
            tasksToKeep.removeAll(tasksToSeparate);

            // Separate the ExecutionTasks and create stages for each connected component.
            do {
                Set<ExecutionTask> component = this.separateConnectedComponent(tasksToSeparate);
                this.splitStage(stage, component);
            } while (!tasksToSeparate.isEmpty());

            // Also split the remainder into connected components.
            while (true) {
                Set<ExecutionTask> component = this.separateConnectedComponent(tasksToKeep);
                // Avoid "splitting" if the tasksToKeep are already a connected component.
                if (tasksToKeep.isEmpty()) break;
                this.splitStage(stage, component);
            }
            return true;
        }
    }