in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/enumeration/PlanEnumerator.java [394:445]
private List<Operator> collectBranchOperatorsStartingFrom(Operator startOperator) {
List<Operator> branch = new LinkedList<>();
Operator currentOperator = startOperator;
while (true) {
boolean isEnumeratable = currentOperator.isExecutionOperator() ||
currentOperator.isAlternative() ||
currentOperator.isLoopSubplan();
if (!isEnumeratable) {
this.logger.trace("Cannot enumerate branch with {}.", currentOperator);
return null;
}
branch.add(currentOperator);
// Cut branches if requested.
if (!this.isEnumeratingBranchesFirst) break;
// Try to advance. This requires certain conditions, though.
OutputSlot<?> followableOutput;
if (currentOperator.isLoopHead()) {
LoopHeadOperator loopHeadOperator = (LoopHeadOperator) currentOperator;
if (loopHeadOperator.getLoopBodyOutputs().size() != 1) {
break;
}
followableOutput = WayangCollections.getSingle(loopHeadOperator.getLoopBodyOutputs());
} else {
if (currentOperator.getNumOutputs() != 1) {
break;
}
followableOutput = currentOperator.getOutput(0);
}
if (followableOutput.getOccupiedSlots().size() != 1) {
this.logger.trace("Stopping branch, because operator does not feed exactly one operator.");
break;
}
Operator nextOperator = currentOperator.getOutput(0).getOccupiedSlots().get(0).getOwner();
if (nextOperator.getNumInputs() != 1) {
this.logger.trace("Stopping branch, because next operator does not have exactly one input.");
break;
}
if (nextOperator == startOperator) {
break;
}
currentOperator = nextOperator;
}
this.logger.trace("Determined branch: {}.", currentOperator);
return branch;
}