in fe/src/main/java/org/apache/impala/analysis/StmtRewriter.java [569:849]
private static boolean mergeExpr(SelectStmt stmt, Expr expr, Analyzer analyzer)
throws AnalysisException {
Preconditions.checkNotNull(expr);
Preconditions.checkNotNull(analyzer);
boolean updateSelectList = false;
SelectStmt subqueryStmt = (SelectStmt) expr.getSubquery().getStatement();
boolean isScalarSubquery = expr.getSubquery().isScalarSubquery();
boolean isScalarColumn = expr.getSubquery().returnsScalarColumn();
boolean isRuntimeScalar = subqueryStmt.isRuntimeScalar();
boolean isDisjunctive = hasSubqueryInDisjunction(expr);
// Create a new inline view from the subquery stmt. The inline view will be added
// to the stmt's table refs later. Explicitly set the inline view's column labels
// to eliminate any chance that column aliases from the parent query could reference
// select items from the inline view after the rewrite.
List<String> colLabels = new ArrayList<>();
for (int i = 0; i < subqueryStmt.getColLabels().size(); ++i) {
colLabels.add(subqueryStmt.getColumnAliasGenerator().getNextAlias());
}
InlineViewRef inlineView =
new InlineViewRef(stmt.getTableAliasGenerator().getNextAlias(), subqueryStmt,
colLabels);
// To handle a subquery in a disjunct, we need to pull out the subexpression that
// is the immediate parent of the subquery and prepare to add additional predicates
// to the WHERE clause of 'stmt'.
List<Expr> whereClauseConjuncts = null;
List<Expr> whereClauseSmapLhs = null;
List<Expr> whereClauseSmapRhs = null;
if (isDisjunctive) {
whereClauseConjuncts = new ArrayList<Expr>();
whereClauseSmapLhs = new ArrayList<Expr>();
whereClauseSmapRhs = new ArrayList<Expr>();
expr = replaceSubqueryInDisjunct(expr, inlineView, subqueryStmt,
whereClauseConjuncts, whereClauseSmapLhs, whereClauseSmapRhs);
}
// Extract all correlated predicates from the subquery.
List<Expr> onClauseConjuncts = extractCorrelatedPredicates(subqueryStmt);
if (!onClauseConjuncts.isEmpty()) {
validateCorrelatedSubqueryStmt(expr);
// For correlated subqueries, a LIMIT clause has no effect on the results, so we can
// safely remove it.
subqueryStmt.limitElement_ = new LimitElement(null, null);
}
// If runtime scalar, we need to prevent the propagation of predicates into the
// inline view by setting a limit on the statement.
if (isRuntimeScalar) subqueryStmt.setLimit(2);
// Update the subquery's select list and/or its GROUP BY clause by adding
// exprs from the extracted correlated predicates.
boolean updateGroupBy = isScalarSubquery
|| (expr instanceof ExistsPredicate
&& !subqueryStmt.getSelectList().isDistinct()
&& subqueryStmt.hasMultiAggInfo());
List<Expr> lhsExprs = new ArrayList<>();
List<Expr> rhsExprs = new ArrayList<>();
for (Expr conjunct : onClauseConjuncts) {
updateInlineView(inlineView, conjunct, stmt.getTableRefIds(), lhsExprs, rhsExprs,
updateGroupBy);
}
// Analyzing the inline view triggers reanalysis of the subquery's select statement.
// However the statement is already analyzed and since statement analysis is not
// idempotent, the analysis needs to be reset.
inlineView.reset();
try {
inlineView.analyze(analyzer);
} catch (AnalysisException e) {
// We can't identify all the aggregate functions until the subquery is fully
// analyzed, so we need to catch the exception here and produce a more helpful
// error message.
if (isDisjunctive && subqueryStmt.hasAggregate(/*includeDistinct=*/ false)) {
// TODO: IMPALA-5098: we could easily support this if DISTINCT and aggregates
// were supported in the same query block.
throw new AnalysisException("Aggregate functions in subquery in disjunction " +
"not supported: " + subqueryStmt.toSql());
}
throw e;
}
inlineView.setLeftTblRef(stmt.fromClause_.get(stmt.fromClause_.size() - 1));
stmt.fromClause_.add(inlineView);
// Create a join conjunct from the expr that contains a subquery.
Expr joinConjunct =
createJoinConjunct(expr, inlineView, analyzer, !onClauseConjuncts.isEmpty());
JoinOperator joinOp = JoinOperator.LEFT_SEMI_JOIN;
if (isDisjunctive) {
// Special case handling of disjunctive subqueries - add the WHERE conjuncts
// generated above and convert to a LEFT OUTER JOIN so we can reference slots
// from subquery.
for (Expr rhsExpr : whereClauseSmapRhs) {
rhsExpr.analyze(analyzer);
}
ExprSubstitutionMap smap =
new ExprSubstitutionMap(whereClauseSmapLhs, whereClauseSmapRhs);
for (Expr pred : whereClauseConjuncts) {
pred = pred.substitute(smap, analyzer, false);
stmt.whereClause_ =
CompoundPredicate.createConjunction(pred, stmt.whereClause_);
}
joinOp = JoinOperator.LEFT_OUTER_JOIN;
updateSelectList = true;
if (joinConjunct != null) onClauseConjuncts.add(joinConjunct);
} else if (joinConjunct != null) {
SelectListItem firstItem =
((SelectStmt) inlineView.getViewStmt()).getSelectList().getItems().get(0);
if (!onClauseConjuncts.isEmpty() && firstItem.getExpr() != null &&
firstItem.getExpr().contains(Expr.NON_NULL_EMPTY_AGG)) {
// Correlated subqueries with an aggregate function that returns non-null on
// an empty input are rewritten using a LEFT OUTER JOIN because we
// need to ensure that there is one agg value for every tuple of 'stmt'
// (parent select block), even for those tuples of 'stmt' that get rejected
// by the subquery due to some predicate. The new join conjunct is added to
// stmt's WHERE clause because it needs to be applied to the result of the
// LEFT OUTER JOIN (both matched and unmatched tuples).
//
// TODO Handle other aggregate functions and UDAs that return a non-NULL value
// on an empty set.
// TODO Handle count aggregate functions in an expression in subqueries
// select list.
stmt.whereClause_ =
CompoundPredicate.createConjunction(joinConjunct, stmt.whereClause_);
joinConjunct = null;
joinOp = JoinOperator.LEFT_OUTER_JOIN;
updateSelectList = true;
}
if (joinConjunct != null) onClauseConjuncts.add(joinConjunct);
}
// Ensure that all the extracted correlated predicates can be added to the ON-clause
// of the generated join.
if (!onClauseConjuncts.isEmpty()) {
validateCorrelatedPredicates(expr, inlineView, onClauseConjuncts);
}
// Create the ON clause from the extracted correlated predicates.
Expr onClausePredicate =
CompoundPredicate.createConjunctivePredicate(onClauseConjuncts);
if (onClausePredicate == null) {
Preconditions.checkState(expr instanceof ExistsPredicate);
ExistsPredicate existsPred = (ExistsPredicate) expr;
// TODO This is very expensive if uncorrelated. Remove it when we implement
// independent subquery evaluation.
if (existsPred.isNotExists()) {
inlineView.setJoinOp(JoinOperator.LEFT_ANTI_JOIN);
} else {
inlineView.setJoinOp(JoinOperator.LEFT_SEMI_JOIN);
}
// Note that the concept of a 'correlated inline view' is similar but not the same
// as a 'correlated subquery', i.e., a subquery with a correlated predicate.
if (!inlineView.isCorrelated()) {
// For uncorrelated subqueries, we limit the number of rows returned by the
// subquery.
subqueryStmt.setLimit(1);
inlineView.setOnClause(new BoolLiteral(true));
}
return false;
}
// Create an smap from the original select-list exprs of the select list to
// the corresponding inline-view columns.
ExprSubstitutionMap smap = new ExprSubstitutionMap();
Preconditions.checkState(lhsExprs.size() == rhsExprs.size());
for (int i = 0; i < lhsExprs.size(); ++i) {
Expr lhsExpr = lhsExprs.get(i);
Expr rhsExpr = rhsExprs.get(i);
rhsExpr.analyze(analyzer);
smap.put(lhsExpr, rhsExpr);
}
onClausePredicate = onClausePredicate.substitute(smap, analyzer, false);
// Check for references to ancestor query blocks (cycles in the dependency
// graph of query blocks are not supported).
if (!onClausePredicate.isBoundByTupleIds(stmt.getTableRefIds())) {
throw new AnalysisException(
"Unsupported correlated subquery: " + subqueryStmt.toSql());
}
// Check if we have a valid ON clause for an equi-join.
boolean hasEqJoinPred = false;
for (Expr conjunct : onClausePredicate.getConjuncts()) {
if (!(conjunct instanceof BinaryPredicate)) continue;
BinaryPredicate.Operator operator = ((BinaryPredicate) conjunct).getOp();
if (!operator.isEquivalence()) continue;
List<TupleId> lhsTupleIds = new ArrayList<>();
conjunct.getChild(0).getIds(lhsTupleIds, null);
// Allows for constants to be a join predicate.
if (lhsTupleIds.isEmpty() && !conjunct.getChild(0).isConstant()) continue;
List<TupleId> rhsTupleIds = new ArrayList<>();
conjunct.getChild(1).getIds(rhsTupleIds, null);
if (rhsTupleIds.isEmpty()) continue;
// Check if columns from the outer query block (stmt) appear in both sides
// of the binary predicate.
if ((lhsTupleIds.contains(inlineView.getDesc().getId()) &&
lhsTupleIds.size() > 1) ||
(rhsTupleIds.contains(inlineView.getDesc().getId()) &&
rhsTupleIds.size() > 1)) {
continue;
}
hasEqJoinPred = true;
break;
}
if (!hasEqJoinPred && !inlineView.isCorrelated() && !isDisjunctive) {
// IMPALA-13991: It is not safe to rewrite into CROSS_JOIN if
// isDisjunctive is True, regardless of joinConjunct value.
// If joinConjunct is NOT NULL, the inlineView maybe correlated through
// that joinConjunct.
// If joinConjunct is NULL, then expr is a (NOT) EXISTS predicate.
// EXISTS within a disjunct is not supported yet (see IMPALA-9931).
// TODO: IMPALA-9948: we could support non-equi joins here
// TODO: Remove this when independent subquery evaluation is implemented.
// TODO: IMPALA-5100 to cover all cases, we do let through runtime scalars with
// group by clauses to allow for subqueries where we haven't implemented plan time
// expression evaluation to ensure only a single row is returned. This may expose
// runtime errors in the presence of multiple runtime scalar subqueries until we
// implement independent evaluation.
boolean hasGroupBy = ((SelectStmt) inlineView.getViewStmt()).hasGroupByClause();
if ((!isScalarSubquery && !isRuntimeScalar)
|| (hasGroupBy && !stmt.selectList_.isDistinct() && !isScalarColumn
&& !isRuntimeScalar)) {
throw new AnalysisException(
"Unsupported predicate with subquery: " + expr.toSql());
}
// TODO: Requires support for null-aware anti-join mode in nested-loop joins
if (isScalarSubquery && expr instanceof InPredicate &&
((InPredicate) expr).isNotIn()) {
throw new AnalysisException(
"Unsupported NOT IN predicate with subquery: " + expr.toSql());
}
// We can rewrite the aggregate subquery using a cross join. All conjuncts
// that were extracted from the subquery are added to stmt's WHERE clause.
stmt.whereClause_ =
CompoundPredicate.createConjunction(onClausePredicate, stmt.whereClause_);
inlineView.setJoinOp(JoinOperator.CROSS_JOIN);
// Indicate this inline view returns at most one value through a
// non-correlated scalar subquery.
if (isScalarSubquery) inlineView.setIsNonCorrelatedScalarSubquery();
// Indicate that the CROSS JOIN may add a new visible tuple to stmt's
// select list (if the latter contains an unqualified star item '*')
return true;
}
// We have a valid equi-join conjunct or the inline view is correlated.
if (expr instanceof InPredicate && ((InPredicate) expr).isNotIn() ||
expr instanceof ExistsPredicate && ((ExistsPredicate) expr).isNotExists()) {
// For the case of a NOT IN with an eq join conjunct, replace the join
// conjunct with a conjunct that uses the null-matching eq operator.
if (expr instanceof InPredicate) {
joinOp = JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
List<TupleId> tIds = new ArrayList<>();
joinConjunct.getIds(tIds, null);
if (tIds.size() <= 1 || !tIds.contains(inlineView.getDesc().getId())) {
throw new AnalysisException(
"Unsupported NOT IN predicate with subquery: " + expr.toSql());
}
// Replace the EQ operator in the generated join conjunct with a
// null-matching EQ operator.
for (Expr conjunct : onClausePredicate.getConjuncts()) {
if (conjunct.equals(joinConjunct)) {
Preconditions.checkState(conjunct instanceof BinaryPredicate);
BinaryPredicate binaryPredicate = (BinaryPredicate) conjunct;
Preconditions.checkState(binaryPredicate.getOp().isEquivalence());
binaryPredicate.setOp(BinaryPredicate.Operator.NULL_MATCHING_EQ);
break;
}
}
} else {
joinOp = JoinOperator.LEFT_ANTI_JOIN;
}
}
inlineView.setJoinOp(joinOp);
inlineView.setOnClause(onClausePredicate);
return updateSelectList;
}