in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java [1125:1223]
private RelNode genGBRelNode(
List<ExprNodeDesc> gbExprs,
List<AggInfo> aggInfos,
List<Integer> groupSets,
RelNode srcRel)
throws SemanticException {
Map<String, Integer> colNameToPos = relToHiveColNameCalcitePosMap.get(srcRel);
HiveParserRexNodeConverter converter =
new HiveParserRexNodeConverter(
cluster, srcRel.getRowType(), colNameToPos, 0, false, funcConverter);
final boolean hasGroupSets = groupSets != null && !groupSets.isEmpty();
final List<RexNode> gbInputRexNodes = new ArrayList<>();
final HashMap<String, Integer> inputRexNodeToIndex = new HashMap<>();
final List<Integer> gbKeyIndices = new ArrayList<>();
int inputIndex = 0;
for (ExprNodeDesc key : gbExprs) {
// also convert null literal here to support grouping by NULLs
RexNode keyRex = convertNullLiteral(converter.convert(key)).accept(funcConverter);
gbInputRexNodes.add(keyRex);
gbKeyIndices.add(inputIndex);
inputRexNodeToIndex.put(keyRex.toString(), inputIndex);
inputIndex++;
}
final ImmutableBitSet groupSet = ImmutableBitSet.of(gbKeyIndices);
// Grouping sets: we need to transform them into ImmutableBitSet objects for Calcite
List<ImmutableBitSet> transformedGroupSets = null;
if (hasGroupSets) {
Set<ImmutableBitSet> set = CollectionUtil.newHashSetWithExpectedSize(groupSets.size());
for (int val : groupSets) {
set.add(convert(val, groupSet.cardinality()));
}
// Calcite expects the grouping sets sorted and without duplicates
transformedGroupSets = new ArrayList<>(set);
transformedGroupSets.sort(ImmutableBitSet.COMPARATOR);
}
// add Agg parameters to inputs
for (AggInfo aggInfo : aggInfos) {
for (ExprNodeDesc expr : aggInfo.getAggParams()) {
RexNode paramRex = converter.convert(expr).accept(funcConverter);
Integer argIndex = inputRexNodeToIndex.get(paramRex.toString());
if (argIndex == null) {
argIndex = gbInputRexNodes.size();
inputRexNodeToIndex.put(paramRex.toString(), argIndex);
gbInputRexNodes.add(paramRex);
}
}
}
// create the actual input before creating agg calls so that the calls can properly infer
// return type
RelNode gbInputRel =
LogicalProject.create(
srcRel, Collections.emptyList(), gbInputRexNodes, (List<String>) null);
List<AggregateCall> aggregateCalls = new ArrayList<>();
for (AggInfo aggInfo : aggInfos) {
aggregateCalls.add(
HiveParserUtils.toAggCall(
aggInfo,
converter,
inputRexNodeToIndex,
groupSet.cardinality(),
gbInputRel,
cluster,
funcConverter));
}
// GROUPING__ID is a virtual col in Hive, so we use Flink's function
if (hasGroupSets) {
// Create GroupingID column
AggregateCall aggCall =
AggregateCall.create(
SqlStdOperatorTable.GROUPING_ID,
false,
false,
false,
gbKeyIndices,
-1,
null,
RelCollations.EMPTY,
groupSet.cardinality(),
gbInputRel,
null,
null);
aggregateCalls.add(aggCall);
}
if (gbInputRexNodes.isEmpty()) {
// This will happen for count(*), in such cases we arbitrarily pick
// first element from srcRel
gbInputRexNodes.add(cluster.getRexBuilder().makeInputRef(srcRel, 0));
}
return LogicalAggregate.create(
gbInputRel, ImmutableList.of(), groupSet, transformedGroupSets, aggregateCalls);
}