in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/enumeration/PlanEnumeration.java [553:595]
public PlanEnumeration escape(OperatorAlternative.Alternative alternative) {
if (alternative == null) return this;
PlanEnumeration escapedInstance = new PlanEnumeration();
final OperatorAlternative operatorAlternative = alternative.getOperatorAlternative();
// Copy and widen the scope.
escapedInstance.scope.addAll(this.scope);
escapedInstance.scope.add(operatorAlternative);
// Escape the input slots.
for (InputSlot inputSlot : this.requestedInputSlots) {
final InputSlot escapedInput = alternative.getSlotMapping().resolveUpstream(inputSlot);
if (escapedInput != null) {
escapedInstance.requestedInputSlots.add(escapedInput);
}
}
// Escape the output slots.
for (Tuple<OutputSlot<?>, InputSlot<?>> link : this.servingOutputSlots) {
if (link.field1 != null) {
throw new IllegalStateException("Cannot escape a connected output slot.");
}
final Collection<OutputSlot<Object>> resolvedOutputSlots =
alternative.getSlotMapping().resolveDownstream(link.field0.unchecked());
for (OutputSlot escapedOutput : resolvedOutputSlots) {
final List<InputSlot<?>> occupiedInputs = escapedOutput.getOccupiedSlots();
if (occupiedInputs.isEmpty()) {
escapedInstance.servingOutputSlots.add(new Tuple<>(escapedOutput, null));
} else {
for (InputSlot inputSlot : occupiedInputs) {
escapedInstance.servingOutputSlots.add(new Tuple<>(escapedOutput, inputSlot));
}
}
}
}
// Escape the PlanImplementation instances.
for (PlanImplementation planImplementation : this.planImplementations) {
escapedInstance.planImplementations.add(planImplementation.escape(alternative, escapedInstance));
}
return escapedInstance;
}