private StageActivator()

in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/platform/CrossPlatformExecutor.java [690:744]


        private StageActivator(ExecutionStage stage, OptimizationContext optimizationContext) {
            this.stage = stage;
            this.optimizationContext = optimizationContext;

            if (this.stage.getLoop() != null) {
                this.loopContext = CrossPlatformExecutor.this.getOrCreateLoopContext(this.stage.getLoop());
                this.loopContext.noteObtainedReference();
            } else {
                this.loopContext = null;
            }

            // Distinguish the inbound Channels of the stage.
            final Collection<Channel> inboundChannels = this.stage.getInboundChannels();
            if (this.stage.isLoopHead()) {
                assert this.stage.getAllTasks().size() == 1 : String.format("Loop head stage looks like this:\n%s", this.stage.getPlanAsString("! "));

                // Loop heads are special in the sense that they don't require all of their inputs.
                for (Channel inboundChannel : inboundChannels) {
                    for (ExecutionTask executionTask : inboundChannel.getConsumers()) {
                        // Avoid consumers from other ExecutionStages.
                        if (executionTask.getStage() != this.stage) continue;

                        // Check special inputs with iteration semantics.
                        if (executionTask.getOperator().isLoopHead()) {
                            LoopHeadOperator loopHead = (LoopHeadOperator) executionTask.getOperator();
                            final InputSlot<?> targetInput = executionTask.getInputSlotFor(inboundChannel);
                            if (loopHead.getLoopBodyInputs().contains(targetInput)) {
                                this.iterationInboundChannels.add(inboundChannel);
                                continue;
                            } else if (loopHead.getLoopInitializationInputs().contains(targetInput)) {
                                this.initializationInboundChannels.add(inboundChannel);
                                continue;
                            }
                        }

                        // Otherwise, we treat it as a regular inbound Channel.
                        this.miscInboundChannels.add(inboundChannel);
                        assert this.checkIfIsLoopInput(inboundChannel) :
                                String.format("%s is not a loop input as expected.", inboundChannel);
                        this.loopInvariantInboundChannels.add(inboundChannel);
                    }
                }

            } else {
                // If we do not have a loop head, we treat all Channels as regular ones.
                this.miscInboundChannels.addAll(inboundChannels);

                // Still, the Channels might be loop inputs.
                for (Channel inboundChannel : inboundChannels) {
                    if (this.checkIfIsLoopInput(inboundChannel)) {
                        this.loopInvariantInboundChannels.add(inboundChannel);
                    }
                }
            }
        }