in wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/optimizer/enumeration/PlanEnumerator.java [454:522]
private PlanEnumeration enumerateBranch(List<Operator> branch, OptimizationContext optimizationContext) {
PlanEnumeration branchEnumeration = null;
Operator lastOperator = null;
for (Operator operator : branch) {
PlanEnumeration operatorEnumeration;
if (operator.isAlternative()) {
operatorEnumeration = this.enumerateAlternative((OperatorAlternative) operator, optimizationContext);
if (operatorEnumeration == null || operatorEnumeration.getPlanImplementations().isEmpty()) {
this.logger.warn("No implementations enumerated for {}.", operator);
return null;
}
} else if (operator.isLoopSubplan()) {
operatorEnumeration = this.enumerateLoop((LoopSubplan) operator, optimizationContext);
} else {
assert operator.isExecutionOperator();
operatorEnumeration = PlanEnumeration.createSingleton((ExecutionOperator) operator, optimizationContext);
// Check if the operator is filtered.
// However, we must not filter operators that are pre-settled (i.e., that have been executed already).
boolean isPresettled = false;
OperatorContainer container = operator.getContainer();
if (container instanceof OperatorAlternative.Alternative) {
OperatorAlternative.Alternative alternative = (OperatorAlternative.Alternative) container;
OperatorAlternative operatorAlternative = alternative.getOperatorAlternative();
isPresettled = this.presettledAlternatives.get(operatorAlternative) == alternative;
}
if (!isPresettled) {
OptimizationContext.OperatorContext operatorContext = optimizationContext.getOperatorContext(operator);
if (operatorContext != null && ((ExecutionOperator) operator).isFiltered(operatorContext)) {
this.logger.info("Filtered {} with context {}.", operator, operatorContext);
operatorEnumeration.getPlanImplementations().clear();
}
}
}
if (operatorEnumeration.getPlanImplementations().isEmpty()) {
if (this.isTopLevel()) {
throw new WayangException(String.format("No implementations enumerated for %s.", operator));
} else {
this.logger.warn("No implementations enumerated for {}.", operator);
}
}
if (branchEnumeration == null) {
branchEnumeration = operatorEnumeration;
} else {
final OutputSlot<?> output = lastOperator.getOutput(0);
branchEnumeration = branchEnumeration.concatenate(
output,
this.openChannels.get(output),
Collections.singletonMap(operator.getInput(0), operatorEnumeration),
optimizationContext,
this.timeMeasurement);
if (branchEnumeration.getPlanImplementations().isEmpty()) {
if (this.isTopLevel()) {
throw new WayangException(String.format("Could not concatenate %s to %s.", lastOperator, operator));
} else {
this.logger.warn("Could not concatenate {} to {}.", lastOperator, operator);
}
}
this.prune(branchEnumeration);
}
lastOperator = operator;
}
return branchEnumeration;
}