in flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java [6147:6388]
private void translateAgg(
SqlCall call,
@Nullable SqlNode filter,
@Nullable SqlNodeList distinctList,
@Nullable SqlNodeList orderList,
boolean ignoreNulls,
SqlCall outerCall) {
assert bb.agg == this;
assert outerCall != null;
final List<SqlNode> operands = call.getOperandList();
final SqlParserPos pos = call.getParserPosition();
final SqlCall call2;
switch (call.getKind()) {
case FILTER:
assert filter == null;
translateAgg(
call.operand(0),
call.operand(1),
distinctList,
orderList,
ignoreNulls,
outerCall);
return;
case WITHIN_DISTINCT:
assert orderList == null;
translateAgg(
call.operand(0),
filter,
call.operand(1),
orderList,
ignoreNulls,
outerCall);
return;
case WITHIN_GROUP:
assert orderList == null;
translateAgg(
call.operand(0),
filter,
distinctList,
call.operand(1),
ignoreNulls,
outerCall);
return;
case IGNORE_NULLS:
ignoreNulls = true;
// fall through
case RESPECT_NULLS:
translateAgg(
call.operand(0),
filter,
distinctList,
orderList,
ignoreNulls,
outerCall);
return;
case COUNTIF:
// COUNTIF(b) ==> COUNT(*) FILTER (WHERE b)
// COUNTIF(b) FILTER (WHERE b2) ==> COUNT(*) FILTER (WHERE b2 AND b)
call2 = SqlStdOperatorTable.COUNT.createCall(pos, SqlIdentifier.star(pos));
final SqlNode filter2 = SqlUtil.andExpressions(filter, call.operand(0));
translateAgg(call2, filter2, distinctList, orderList, ignoreNulls, outerCall);
return;
case STRING_AGG:
// Translate "STRING_AGG(s, sep ORDER BY x, y)"
// as if it were "LISTAGG(s, sep) WITHIN GROUP (ORDER BY x, y)";
// and "STRING_AGG(s, sep)" as "LISTAGG(s, sep)".
final List<SqlNode> operands2;
if (!operands.isEmpty() && Util.last(operands) instanceof SqlNodeList) {
orderList = (SqlNodeList) Util.last(operands);
operands2 = Util.skipLast(operands);
} else {
operands2 = operands;
}
call2 =
SqlStdOperatorTable.LISTAGG.createCall(
call.getFunctionQuantifier(), pos, operands2);
translateAgg(call2, filter, distinctList, orderList, ignoreNulls, outerCall);
return;
case GROUP_CONCAT:
// Translate "GROUP_CONCAT(s ORDER BY x, y SEPARATOR ',')"
// as if it were "LISTAGG(s, ',') WITHIN GROUP (ORDER BY x, y)".
// To do this, build a list of operands without ORDER BY with with sep.
operands2 = new ArrayList<>(operands);
final SqlNode separator;
if (!operands2.isEmpty()
&& Util.last(operands2).getKind() == SqlKind.SEPARATOR) {
final SqlCall sepCall = (SqlCall) operands2.remove(operands.size() - 1);
separator = sepCall.operand(0);
} else {
separator = null;
}
if (!operands2.isEmpty() && Util.last(operands2) instanceof SqlNodeList) {
orderList = (SqlNodeList) operands2.remove(operands2.size() - 1);
}
if (separator != null) {
operands2.add(separator);
}
call2 =
SqlStdOperatorTable.LISTAGG.createCall(
call.getFunctionQuantifier(), pos, operands2);
translateAgg(call2, filter, distinctList, orderList, ignoreNulls, outerCall);
return;
case ARRAY_AGG:
case ARRAY_CONCAT_AGG:
// Translate "ARRAY_AGG(s ORDER BY x, y)"
// as if it were "ARRAY_AGG(s) WITHIN GROUP (ORDER BY x, y)";
// similarly "ARRAY_CONCAT_AGG".
if (!operands.isEmpty() && Util.last(operands) instanceof SqlNodeList) {
orderList = (SqlNodeList) Util.last(operands);
call2 =
call.getOperator()
.createCall(
call.getFunctionQuantifier(),
pos,
Util.skipLast(operands));
translateAgg(
call2, filter, distinctList, orderList, ignoreNulls, outerCall);
return;
}
// "ARRAY_AGG" and "ARRAY_CONCAT_AGG" without "ORDER BY"
// are handled normally; fall through.
default:
break;
}
final List<Integer> args = new ArrayList<>();
int filterArg = -1;
final ImmutableBitSet distinctKeys;
try {
// switch out of agg mode
bb.agg = null;
// ----- FLINK MODIFICATION BEGIN -----
FlinkSqlCallBinding binding =
new FlinkSqlCallBinding(validator(), aggregatingSelectScope, call);
List<SqlNode> sqlNodes = binding.operands();
for (int i = 0; i < sqlNodes.size(); i++) {
SqlNode operand = sqlNodes.get(i);
// special case for COUNT(*): delete the *
if (operand instanceof SqlIdentifier) {
SqlIdentifier id = (SqlIdentifier) operand;
if (id.isStar()) {
assert call.operandCount() == 1;
assert args.isEmpty();
break;
}
}
RexNode convertedExpr = bb.convertExpression(operand);
args.add(lookupOrCreateGroupExpr(convertedExpr));
}
// ----- FLINK MODIFICATION END -----
if (filter != null) {
RexNode convertedExpr = bb.convertExpression(filter);
if (convertedExpr.getType().isNullable()) {
convertedExpr =
rexBuilder.makeCall(SqlStdOperatorTable.IS_TRUE, convertedExpr);
}
filterArg = lookupOrCreateGroupExpr(convertedExpr);
}
if (distinctList == null) {
distinctKeys = null;
} else {
final ImmutableBitSet.Builder distinctBuilder = ImmutableBitSet.builder();
for (SqlNode distinct : distinctList) {
RexNode e = bb.convertExpression(distinct);
assert e != null;
distinctBuilder.set(lookupOrCreateGroupExpr(e));
}
distinctKeys = distinctBuilder.build();
}
} finally {
// switch back into agg mode
bb.agg = this;
}
SqlAggFunction aggFunction = (SqlAggFunction) call.getOperator();
final RelDataType type = validator().deriveType(bb.scope(), call);
boolean distinct = false;
SqlLiteral quantifier = call.getFunctionQuantifier();
if ((null != quantifier) && (quantifier.getValue() == SqlSelectKeyword.DISTINCT)) {
distinct = true;
}
boolean approximate = false;
if (aggFunction == SqlStdOperatorTable.APPROX_COUNT_DISTINCT) {
aggFunction = SqlStdOperatorTable.COUNT;
distinct = true;
approximate = true;
}
final RelCollation collation;
if (orderList == null || orderList.size() == 0) {
collation = RelCollations.EMPTY;
} else {
try {
// switch out of agg mode
bb.agg = null;
collation =
RelCollations.of(
orderList.stream()
.map(
order ->
bb.convertSortExpression(
order,
RelFieldCollation.Direction
.ASCENDING,
RelFieldCollation.NullDirection
.UNSPECIFIED,
this::sortToFieldCollation))
.collect(Collectors.toList()));
} finally {
// switch back into agg mode
bb.agg = this;
}
}
final AggregateCall aggCall =
AggregateCall.create(
aggFunction,
distinct,
approximate,
ignoreNulls,
args,
filterArg,
distinctKeys,
collation,
type,
nameMap.get(outerCall.toString()));
RexNode rex =
rexBuilder.addAggCall(
aggCall,
groupExprs.size(),
aggCalls,
aggCallMapping,
i -> convertedInputExprs.get(i).left.getType().isNullable());
aggMapping.put(outerCall, rex);
}