private RelNode genGBLogicalPlan()

in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java [1226:1396]


    private RelNode genGBLogicalPlan(HiveParserQB qb, RelNode srcRel) throws SemanticException {
        RelNode gbRel = null;
        HiveParserQBParseInfo qbp = qb.getParseInfo();

        // 1. Gather GB Expressions (AST) (GB + Aggregations)
        // NOTE: Multi Insert is not supported
        String detsClauseName = qbp.getClauseNames().iterator().next();
        HiveParserASTNode selExprList = qb.getParseInfo().getSelForClause(detsClauseName);
        HiveParserSubQueryUtils.checkForTopLevelSubqueries(selExprList);
        if (selExprList.getToken().getType() == HiveASTParser.TOK_SELECTDI
                && selExprList.getChildCount() == 1
                && selExprList.getChild(0).getChildCount() == 1) {
            HiveParserASTNode node = (HiveParserASTNode) selExprList.getChild(0).getChild(0);
            if (node.getToken().getType() == HiveASTParser.TOK_ALLCOLREF) {
                srcRel = genSelectLogicalPlan(qb, srcRel, srcRel, null, null);
                HiveParserRowResolver rr = relToRowResolver.get(srcRel);
                qbp.setSelExprForClause(detsClauseName, HiveParserUtils.genSelectDIAST(rr));
            }
        }

        // Select DISTINCT + windowing; GBy handled by genSelectForWindowing
        if (selExprList.getToken().getType() == HiveASTParser.TOK_SELECTDI
                && !qb.getAllWindowingSpecs().isEmpty()) {
            return null;
        }

        List<HiveParserASTNode> gbAstExprs = getGroupByForClause(qbp, detsClauseName);
        HashMap<String, HiveParserASTNode> aggregationTrees =
                qbp.getAggregationExprsForClause(detsClauseName);
        boolean hasGrpByAstExprs = !gbAstExprs.isEmpty();
        boolean hasAggregationTrees = aggregationTrees != null && !aggregationTrees.isEmpty();

        final boolean cubeRollupGrpSetPresent =
                !qbp.getDestRollups().isEmpty()
                        || !qbp.getDestGroupingSets().isEmpty()
                        || !qbp.getDestCubes().isEmpty();

        // 2. Sanity check
        if (semanticAnalyzer.getConf().getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)
                && qbp.getDistinctFuncExprsForClause(detsClauseName).size() > 1) {
            throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.getMsg());
        }

        if (hasGrpByAstExprs || hasAggregationTrees) {
            ArrayList<ExprNodeDesc> gbExprNodeDescs = new ArrayList<>();
            ArrayList<String> outputColNames = new ArrayList<>();

            // 3. Input, Output Row Resolvers
            HiveParserRowResolver inputRR = relToRowResolver.get(srcRel);
            HiveParserRowResolver outputRR = new HiveParserRowResolver();
            outputRR.setIsExprResolver(true);

            if (hasGrpByAstExprs) {
                // 4. Construct GB Keys (ExprNode)
                for (HiveParserASTNode gbAstExpr : gbAstExprs) {
                    Map<HiveParserASTNode, ExprNodeDesc> astToExprNodeDesc =
                            semanticAnalyzer.genAllExprNodeDesc(gbAstExpr, inputRR);
                    ExprNodeDesc grpbyExprNDesc = astToExprNodeDesc.get(gbAstExpr);
                    if (grpbyExprNDesc == null) {
                        throw new SemanticException(
                                "Invalid Column Reference: " + gbAstExpr.dump());
                    }

                    addToGBExpr(
                            outputRR,
                            inputRR,
                            gbAstExpr,
                            grpbyExprNDesc,
                            gbExprNodeDescs,
                            outputColNames);
                }
            }

            // 5. GroupingSets, Cube, Rollup
            int numGroupCols = gbExprNodeDescs.size();
            List<Integer> groupingSets = null;
            if (cubeRollupGrpSetPresent) {
                if (qbp.getDestRollups().contains(detsClauseName)) {
                    groupingSets = getGroupingSetsForRollup(gbAstExprs.size());
                } else if (qbp.getDestCubes().contains(detsClauseName)) {
                    groupingSets = getGroupingSetsForCube(gbAstExprs.size());
                } else if (qbp.getDestGroupingSets().contains(detsClauseName)) {
                    groupingSets = getGroupingSets(gbAstExprs, qbp, detsClauseName);
                }
            }

            // 6. Construct aggregation function Info
            ArrayList<AggInfo> aggInfos = new ArrayList<>();
            if (hasAggregationTrees) {
                for (HiveParserASTNode value : aggregationTrees.values()) {
                    // 6.1 Determine type of UDAF
                    // This is the GenericUDAF name
                    String aggName = unescapeIdentifier(value.getChild(0).getText());
                    boolean isDistinct = value.getType() == HiveASTParser.TOK_FUNCTIONDI;
                    boolean isAllColumns = value.getType() == HiveASTParser.TOK_FUNCTIONSTAR;

                    // 6.2 Convert UDAF Params to ExprNodeDesc
                    ArrayList<ExprNodeDesc> aggParameters = new ArrayList<>();
                    for (int i = 1; i < value.getChildCount(); i++) {
                        HiveParserASTNode paraExpr = (HiveParserASTNode) value.getChild(i);
                        ExprNodeDesc paraExprNode =
                                semanticAnalyzer.genExprNodeDesc(paraExpr, inputRR);
                        aggParameters.add(paraExprNode);
                    }

                    GenericUDAFEvaluator.Mode aggMode =
                            HiveParserUtils.groupByDescModeToUDAFMode(
                                    GroupByDesc.Mode.COMPLETE, isDistinct);
                    GenericUDAFEvaluator genericUDAFEvaluator =
                            HiveParserUtils.getGenericUDAFEvaluator(
                                    aggName,
                                    aggParameters,
                                    value,
                                    isDistinct,
                                    isAllColumns,
                                    frameworkConfig.getOperatorTable());
                    assert (genericUDAFEvaluator != null);
                    HiveParserBaseSemanticAnalyzer.GenericUDAFInfo udaf =
                            HiveParserUtils.getGenericUDAFInfo(
                                    genericUDAFEvaluator, aggMode, aggParameters);
                    String aggAlias = null;
                    if (value.getParent().getType() == HiveASTParser.TOK_SELEXPR
                            && value.getParent().getChildCount() == 2) {
                        aggAlias =
                                unescapeIdentifier(
                                        value.getParent().getChild(1).getText().toLowerCase());
                    }
                    AggInfo aggInfo =
                            new AggInfo(
                                    aggParameters,
                                    udaf.returnType,
                                    aggName,
                                    isDistinct,
                                    isAllColumns,
                                    aggAlias);
                    aggInfos.add(aggInfo);
                    String field =
                            aggAlias == null
                                    ? getColumnInternalName(numGroupCols + aggInfos.size() - 1)
                                    : aggAlias;
                    outputColNames.add(field);
                    outputRR.putExpression(
                            value, new ColumnInfo(field, aggInfo.getReturnType(), "", false));
                }
            }

            // 7. If GroupingSets, Cube, Rollup were used, we account grouping__id
            // GROUPING__ID is also required by the GROUPING function, so let's always add it for
            // grouping sets
            if (groupingSets != null && !groupingSets.isEmpty()) {
                String field = getColumnInternalName(numGroupCols + aggInfos.size());
                outputColNames.add(field);
                outputRR.put(
                        null,
                        VirtualColumn.GROUPINGID.getName(),
                        new ColumnInfo(
                                field,
                                // flink grouping_id's return type is bigint
                                TypeInfoFactory.longTypeInfo,
                                null,
                                true));
            }

            // 8. We create the group_by operator
            gbRel = genGBRelNode(gbExprNodeDescs, aggInfos, groupingSets, srcRel);
            relToHiveColNameCalcitePosMap.put(gbRel, buildHiveToCalciteColumnMap(outputRR));
            relToRowResolver.put(gbRel, outputRR);
        }

        return gbRel;
    }