in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/platform/CrossPlatformExecutor.java [690:744]
private StageActivator(ExecutionStage stage, OptimizationContext optimizationContext) {
this.stage = stage;
this.optimizationContext = optimizationContext;
if (this.stage.getLoop() != null) {
this.loopContext = CrossPlatformExecutor.this.getOrCreateLoopContext(this.stage.getLoop());
this.loopContext.noteObtainedReference();
} else {
this.loopContext = null;
}
// Distinguish the inbound Channels of the stage.
final Collection<Channel> inboundChannels = this.stage.getInboundChannels();
if (this.stage.isLoopHead()) {
assert this.stage.getAllTasks().size() == 1 : String.format("Loop head stage looks like this:\n%s", this.stage.getPlanAsString("! "));
// Loop heads are special in the sense that they don't require all of their inputs.
for (Channel inboundChannel : inboundChannels) {
for (ExecutionTask executionTask : inboundChannel.getConsumers()) {
// Avoid consumers from other ExecutionStages.
if (executionTask.getStage() != this.stage) continue;
// Check special inputs with iteration semantics.
if (executionTask.getOperator().isLoopHead()) {
LoopHeadOperator loopHead = (LoopHeadOperator) executionTask.getOperator();
final InputSlot<?> targetInput = executionTask.getInputSlotFor(inboundChannel);
if (loopHead.getLoopBodyInputs().contains(targetInput)) {
this.iterationInboundChannels.add(inboundChannel);
continue;
} else if (loopHead.getLoopInitializationInputs().contains(targetInput)) {
this.initializationInboundChannels.add(inboundChannel);
continue;
}
}
// Otherwise, we treat it as a regular inbound Channel.
this.miscInboundChannels.add(inboundChannel);
assert this.checkIfIsLoopInput(inboundChannel) :
String.format("%s is not a loop input as expected.", inboundChannel);
this.loopInvariantInboundChannels.add(inboundChannel);
}
}
} else {
// If we do not have a loop head, we treat all Channels as regular ones.
this.miscInboundChannels.addAll(inboundChannels);
// Still, the Channels might be loop inputs.
for (Channel inboundChannel : inboundChannels) {
if (this.checkIfIsLoopInput(inboundChannel)) {
this.loopInvariantInboundChannels.add(inboundChannel);
}
}
}
}