in flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java [1192:1476]
private void substituteSubQuery(Blackboard bb, SubQuery subQuery) {
final RexNode expr = subQuery.expr;
if (expr != null) {
// Already done.
return;
}
final SqlBasicCall call;
final RelNode rel;
final SqlNode query;
final RelOptUtil.Exists converted;
switch (subQuery.node.getKind()) {
case CURSOR:
convertCursor(bb, subQuery);
return;
case ARRAY_QUERY_CONSTRUCTOR:
case MAP_QUERY_CONSTRUCTOR:
case MULTISET_QUERY_CONSTRUCTOR:
if (!config.isExpand()) {
return;
}
// fall through
case MULTISET_VALUE_CONSTRUCTOR:
rel = convertMultisets(ImmutableList.of(subQuery.node), bb);
subQuery.expr = bb.register(rel, JoinRelType.INNER);
return;
case IN:
case NOT_IN:
case SOME:
case ALL:
call = (SqlBasicCall) subQuery.node;
query = call.operand(1);
if (!config.isExpand() && !(query instanceof SqlNodeList)) {
return;
}
final SqlNode leftKeyNode = call.operand(0);
final List<RexNode> leftKeys;
switch (leftKeyNode.getKind()) {
case ROW:
leftKeys = new ArrayList<>();
for (SqlNode sqlExpr : ((SqlBasicCall) leftKeyNode).getOperandList()) {
leftKeys.add(bb.convertExpression(sqlExpr));
}
break;
default:
leftKeys = ImmutableList.of(bb.convertExpression(leftKeyNode));
}
if (query instanceof SqlNodeList) {
SqlNodeList valueList = (SqlNodeList) query;
// When the list size under the threshold or the list references columns, we
// convert to OR.
if (valueList.size() < config.getInSubQueryThreshold()
|| valueList.accept(new SqlIdentifierFinder())) {
subQuery.expr =
convertInToOr(
bb,
leftKeys,
valueList,
(SqlInOperator) call.getOperator());
return;
}
// Otherwise, let convertExists translate
// values list into an inline table for the
// reference to Q below.
}
// Project out the search columns from the left side
// Q1:
// "select from emp where emp.deptno in (select col1 from T)"
//
// is converted to
//
// "select from
// emp inner join (select distinct col1 from T)) q
// on emp.deptno = q.col1
//
// Q2:
// "select from emp where emp.deptno not in (Q)"
//
// is converted to
//
// "select from
// emp left outer join (select distinct col1, TRUE from T) q
// on emp.deptno = q.col1
// where emp.deptno <> null
// and q.indicator <> TRUE"
//
// Note: Sub-query can be used as SqlUpdate#condition like below:
//
// UPDATE emp
// SET empno = 1 WHERE emp.empno IN (
// SELECT emp.empno FROM emp WHERE emp.empno = 2)
//
// In such case, when converting SqlUpdate#condition, bb.root is null
// and it makes no sense to do the sub-query substitution.
if (bb.root == null) {
return;
}
final RelDataType targetRowType =
SqlTypeUtil.promoteToRowType(
typeFactory, validator().getValidatedNodeType(leftKeyNode), null);
final boolean notIn = call.getOperator().kind == SqlKind.NOT_IN;
converted =
convertExists(
query,
RelOptUtil.SubQueryType.IN,
subQuery.logic,
notIn,
targetRowType);
if (converted.indicator) {
// Generate
// emp CROSS JOIN (SELECT COUNT(*) AS c,
// COUNT(deptno) AS ck FROM dept)
final RelDataType longType = typeFactory.createSqlType(SqlTypeName.BIGINT);
final RelNode seek = converted.r.getInput(0); // fragile
final int keyCount = leftKeys.size();
final List<Integer> args = ImmutableIntList.range(0, keyCount);
LogicalAggregate aggregate =
LogicalAggregate.create(
seek,
ImmutableList.of(),
ImmutableBitSet.of(),
null,
ImmutableList.of(
AggregateCall.create(
SqlStdOperatorTable.COUNT,
false,
false,
false,
ImmutableList.of(),
-1,
null,
RelCollations.EMPTY,
longType,
null),
AggregateCall.create(
SqlStdOperatorTable.COUNT,
false,
false,
false,
args,
-1,
null,
RelCollations.EMPTY,
longType,
null)));
LogicalJoin join =
LogicalJoin.create(
bb.root(),
aggregate,
ImmutableList.of(),
rexBuilder.makeLiteral(true),
ImmutableSet.of(),
JoinRelType.INNER);
bb.setRoot(join, false);
}
final RexNode rex =
bb.register(
converted.r,
converted.outerJoin ? JoinRelType.LEFT : JoinRelType.INNER,
leftKeys);
RelOptUtil.Logic logic = subQuery.logic;
switch (logic) {
case TRUE_FALSE_UNKNOWN:
case UNKNOWN_AS_TRUE:
if (!converted.indicator) {
logic = RelOptUtil.Logic.TRUE_FALSE;
}
break;
default:
break;
}
subQuery.expr = translateIn(logic, bb.root, rex);
if (notIn) {
subQuery.expr = rexBuilder.makeCall(SqlStdOperatorTable.NOT, subQuery.expr);
}
return;
case EXISTS:
// "select from emp where exists (select a from T)"
//
// is converted to the following if the sub-query is correlated:
//
// "select from emp left outer join (select AGG_TRUE() as indicator
// from T group by corr_var) q where q.indicator is true"
//
// If there is no correlation, the expression is replaced with a
// boolean indicating whether the sub-query returned 0 or >= 1 row.
if (!config.isExpand()) {
return;
}
call = (SqlBasicCall) subQuery.node;
query = call.operand(0);
final SqlValidatorScope seekScope =
(query instanceof SqlSelect)
? validator().getSelectScope((SqlSelect) query)
: null;
final Blackboard seekBb = createBlackboard(seekScope, null, false);
final RelNode seekRel = convertQueryOrInList(seekBb, query, null);
requireNonNull(seekRel, () -> "seekRel is null for query " + query);
// An EXIST sub-query whose inner child has at least 1 tuple
// (e.g. an Aggregate with no grouping columns or non-empty Values
// node) should be simplified to a Boolean constant expression.
final RelMetadataQuery mq = seekRel.getCluster().getMetadataQuery();
final Double minRowCount = mq.getMinRowCount(seekRel);
if (minRowCount != null && minRowCount >= 1D) {
subQuery.expr = rexBuilder.makeLiteral(true);
return;
}
converted =
RelOptUtil.createExistsPlan(
seekRel,
RelOptUtil.SubQueryType.EXISTS,
subQuery.logic,
true,
relBuilder);
assert !converted.indicator;
if (convertNonCorrelatedSubQuery(subQuery, bb, converted.r, true)) {
return;
}
subQuery.expr = bb.register(converted.r, JoinRelType.LEFT);
return;
case UNIQUE:
return;
case SCALAR_QUERY:
// Convert the sub-query. If it's non-correlated, convert it
// to a constant expression.
if (!config.isExpand()) {
return;
}
call = (SqlBasicCall) subQuery.node;
query = call.operand(0);
converted =
convertExists(
query, RelOptUtil.SubQueryType.SCALAR, subQuery.logic, true, null);
assert !converted.indicator;
if (convertNonCorrelatedSubQuery(subQuery, bb, converted.r, false)) {
return;
}
rel = convertToSingleValueSubq(query, converted.r);
subQuery.expr = bb.register(rel, JoinRelType.LEFT);
return;
case SELECT:
// This is used when converting multiset queries:
//
// select * from unnest(select multiset[deptno] from emps);
//
converted =
convertExists(
subQuery.node,
RelOptUtil.SubQueryType.SCALAR,
subQuery.logic,
true,
null);
assert !converted.indicator;
subQuery.expr = bb.register(converted.r, JoinRelType.LEFT);
// This is used when converting window table functions:
//
// select * from table(tumble(table emps, descriptor(deptno), interval '3' DAY))
//
bb.cursors.add(converted.r);
return;
case SET_SEMANTICS_TABLE:
// ----- FLINK MODIFICATION BEGIN -----
// We always expand the SET_SEMANTICS_TABLE due to CALCITE-6204 and to
// reuse some subsequent processing and optimization logic.
// if (!config.isExpand()) {
// return;
// }
// ----- FLINK MODIFICATION END -----
substituteSubQueryOfSetSemanticsInputTable(bb, subQuery);
return;
default:
throw new AssertionError("unexpected kind of sub-query: " + subQuery.node);
}
}