in asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/OperatorUtils.java [51:150]
public static void createDistinctOpsForJoinNodes(ILogicalOperator op,
Pair<List<LogicalVariable>, List<AbstractFunctionCallExpression>> distinctVarsFuncPair,
IOptimizationContext context, HashMap<DataSourceScanOperator, ILogicalOperator> scanAndDistinctOps) {
List<LogicalVariable> distinctVars = distinctVarsFuncPair.getFirst();
List<AbstractFunctionCallExpression> distinctFunctions = distinctVarsFuncPair.getSecond();
if (op == null || distinctVars.size() == 0) {
return;
}
List<LogicalVariable> foundDistinctVars = new ArrayList<>();
ILogicalOperator selOp = null, assignOp = null;
ILogicalOperator currentOp = op;
LogicalOperatorTag tag = currentOp.getOperatorTag();
// add DistinctOp to count distinct values in an attribute
if (tag == LogicalOperatorTag.INNERJOIN || tag == LogicalOperatorTag.LEFTOUTERJOIN) {
for (int i = 0; i < currentOp.getInputs().size(); i++) {
ILogicalOperator nextOp = currentOp.getInputs().get(i).getValue();
createDistinctOpsForJoinNodes(nextOp, distinctVarsFuncPair, context, scanAndDistinctOps);
}
} else {
DataSourceScanOperator scanOp = null;
LogicalVariable assignVar;
while (tag != LogicalOperatorTag.EMPTYTUPLESOURCE) {
if (tag == LogicalOperatorTag.SELECT) {
ILogicalOperator nextOp = currentOp.getInputs().get(0).getValue();
if (nextOp.getOperatorTag() == LogicalOperatorTag.ASSIGN
|| nextOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
if (selOp == null && assignOp == null) { // first corresponding SelectOp found iff no corresponding AssignOp appeared before
selOp = currentOp; // one DataSourceScanOp possible, save the corresponding SelectOp
}
}
} else if (tag == LogicalOperatorTag.ASSIGN) {
assignVar = ((AssignOperator) currentOp).getVariables().get(0);
int idx = distinctVars.indexOf(assignVar);
if (idx != -1 && assignOp == null) { // first corresponding AssignOp found
assignOp = currentOp;
}
if (idx != -1) { // add all variables of the AssignOp
foundDistinctVars.add(assignVar);
}
} else if (tag == LogicalOperatorTag.DATASOURCESCAN) {
scanOp = (DataSourceScanOperator) currentOp;
// will work for any attributes present in GroupByOp or DistinctOp
List<LogicalVariable> scanVars = scanOp.getVariables();
for (LogicalVariable scanVar : scanVars) { // add all required variables of the DataSourceScanOp
if (distinctVars.contains(scanVar)) {
foundDistinctVars.add(scanVar);
}
}
if (foundDistinctVars.size() == 0) {
scanOp = null; // GroupByOp or DistinctOp doesn't contain any attributes of the dataset
}
} else if (tag == LogicalOperatorTag.GROUP) { // GroupByOp found through looping (not as direct inputs of a JoinOp)
List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> nestedGrpVarsList =
((GroupByOperator) currentOp).getGroupByList();
// look for any DistinctOp/GroupByOp variables are replaceable with a nested GroupByOp Variable-expression
for (int i = 0; i < nestedGrpVarsList.size(); i++) {
LogicalVariable prevVar = nestedGrpVarsList.get(i).first;
int idx = distinctVars.indexOf(prevVar);
if (idx != -1 && distinctVars.size() > 0) {
ILogicalExpression expr = nestedGrpVarsList.get(i).second.getValue();
if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
LogicalVariable newVar = ((VariableReferenceExpression) expr).getVariableReference();
distinctVars.remove(idx);
distinctVars.add(newVar);
// replace corresponding functions' variables
for (AbstractFunctionCallExpression funcExpr : distinctFunctions) {
replaceVariableInFunction(funcExpr, prevVar, newVar);
}
}
}
}
} else if (tag == LogicalOperatorTag.INNERJOIN || tag == LogicalOperatorTag.LEFTOUTERJOIN) {
for (int i = 0; i < currentOp.getInputs().size(); i++) {
ILogicalOperator nextOp = currentOp.getInputs().get(i).getValue();
createDistinctOpsForJoinNodes(nextOp, distinctVarsFuncPair, context, scanAndDistinctOps);
}
break; // next operators are already handled in the recursion, so exit looping
}
// TODO(mehnaz): handle DISTINCT and UNNEST operators (if appears in sub-queries)
// proceed to the next operator
currentOp = currentOp.getInputs().get(0).getValue();
tag = currentOp.getOperatorTag();
}
if (scanOp != null) {
ILogicalOperator inputOp = (selOp != null) ? selOp : ((assignOp != null) ? assignOp : scanOp);
SourceLocation sourceLocation = inputOp.getSourceLocation();
DistinctOperator distinctOp =
createDistinctOp(foundDistinctVars, inputOp, sourceLocation, distinctFunctions, context);
if (distinctOp != null) {
scanAndDistinctOps.put(scanOp, distinctOp);
}
}
}
}