private RelNode genGBRelNode()

in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java [1125:1223]


    private RelNode genGBRelNode(
            List<ExprNodeDesc> gbExprs,
            List<AggInfo> aggInfos,
            List<Integer> groupSets,
            RelNode srcRel)
            throws SemanticException {
        Map<String, Integer> colNameToPos = relToHiveColNameCalcitePosMap.get(srcRel);
        HiveParserRexNodeConverter converter =
                new HiveParserRexNodeConverter(
                        cluster, srcRel.getRowType(), colNameToPos, 0, false, funcConverter);

        final boolean hasGroupSets = groupSets != null && !groupSets.isEmpty();
        final List<RexNode> gbInputRexNodes = new ArrayList<>();
        final HashMap<String, Integer> inputRexNodeToIndex = new HashMap<>();
        final List<Integer> gbKeyIndices = new ArrayList<>();
        int inputIndex = 0;
        for (ExprNodeDesc key : gbExprs) {
            // also convert null literal here to support grouping by NULLs
            RexNode keyRex = convertNullLiteral(converter.convert(key)).accept(funcConverter);
            gbInputRexNodes.add(keyRex);
            gbKeyIndices.add(inputIndex);
            inputRexNodeToIndex.put(keyRex.toString(), inputIndex);
            inputIndex++;
        }
        final ImmutableBitSet groupSet = ImmutableBitSet.of(gbKeyIndices);

        // Grouping sets: we need to transform them into ImmutableBitSet objects for Calcite
        List<ImmutableBitSet> transformedGroupSets = null;
        if (hasGroupSets) {
            Set<ImmutableBitSet> set = CollectionUtil.newHashSetWithExpectedSize(groupSets.size());
            for (int val : groupSets) {
                set.add(convert(val, groupSet.cardinality()));
            }
            // Calcite expects the grouping sets sorted and without duplicates
            transformedGroupSets = new ArrayList<>(set);
            transformedGroupSets.sort(ImmutableBitSet.COMPARATOR);
        }

        // add Agg parameters to inputs
        for (AggInfo aggInfo : aggInfos) {
            for (ExprNodeDesc expr : aggInfo.getAggParams()) {
                RexNode paramRex = converter.convert(expr).accept(funcConverter);
                Integer argIndex = inputRexNodeToIndex.get(paramRex.toString());
                if (argIndex == null) {
                    argIndex = gbInputRexNodes.size();
                    inputRexNodeToIndex.put(paramRex.toString(), argIndex);
                    gbInputRexNodes.add(paramRex);
                }
            }
        }

        // create the actual input before creating agg calls so that the calls can properly infer
        // return type
        RelNode gbInputRel =
                LogicalProject.create(
                        srcRel, Collections.emptyList(), gbInputRexNodes, (List<String>) null);

        List<AggregateCall> aggregateCalls = new ArrayList<>();
        for (AggInfo aggInfo : aggInfos) {
            aggregateCalls.add(
                    HiveParserUtils.toAggCall(
                            aggInfo,
                            converter,
                            inputRexNodeToIndex,
                            groupSet.cardinality(),
                            gbInputRel,
                            cluster,
                            funcConverter));
        }

        // GROUPING__ID is a virtual col in Hive, so we use Flink's function
        if (hasGroupSets) {
            // Create GroupingID column
            AggregateCall aggCall =
                    AggregateCall.create(
                            SqlStdOperatorTable.GROUPING_ID,
                            false,
                            false,
                            false,
                            gbKeyIndices,
                            -1,
                            null,
                            RelCollations.EMPTY,
                            groupSet.cardinality(),
                            gbInputRel,
                            null,
                            null);
            aggregateCalls.add(aggCall);
        }

        if (gbInputRexNodes.isEmpty()) {
            // This will happen for count(*), in such cases we arbitrarily pick
            // first element from srcRel
            gbInputRexNodes.add(cluster.getRexBuilder().makeInputRef(srcRel, 0));
        }

        return LogicalAggregate.create(
                gbInputRel, ImmutableList.of(), groupSet, transformedGroupSets, aggregateCalls);
    }