private Pair genDistSortBy()

in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java [1401:1596]


    private Pair<RelNode, RelNode> genDistSortBy(
            HiveParserQB qb, RelNode srcRel, boolean outermostOB) throws SemanticException {
        RelNode res = null;
        RelNode originalInput = null;

        HiveParserQBParseInfo qbp = qb.getParseInfo();
        String destClause = qbp.getClauseNames().iterator().next();

        HiveParserASTNode sortAST = qbp.getSortByForClause(destClause);
        HiveParserASTNode distAST = qbp.getDistributeByForClause(destClause);
        HiveParserASTNode clusterAST = qbp.getClusterByForClause(destClause);

        if (sortAST != null || distAST != null || clusterAST != null) {
            List<RexNode> virtualCols = new ArrayList<>();
            List<Pair<HiveParserASTNode, TypeInfo>> vcASTAndType = new ArrayList<>();
            List<RelFieldCollation> fieldCollations = new ArrayList<>();
            List<Integer> distKeys = new ArrayList<>();

            HiveParserRowResolver inputRR = relToRowResolver.get(srcRel);
            HiveParserRexNodeConverter converter =
                    new HiveParserRexNodeConverter(
                            cluster,
                            srcRel.getRowType(),
                            relToHiveColNameCalcitePosMap.get(srcRel),
                            0,
                            false,
                            funcConverter);
            int numSrcFields = srcRel.getRowType().getFieldCount();

            // handle cluster by
            if (clusterAST != null) {
                if (sortAST != null) {
                    throw new SemanticException("Cannot have both CLUSTER BY and SORT BY");
                }
                if (distAST != null) {
                    throw new SemanticException("Cannot have both CLUSTER BY and DISTRIBUTE BY");
                }
                for (Node node : clusterAST.getChildren()) {
                    HiveParserASTNode childAST = (HiveParserASTNode) node;
                    Map<HiveParserASTNode, ExprNodeDesc> astToExprNodeDesc =
                            semanticAnalyzer.genAllExprNodeDesc(childAST, inputRR);
                    ExprNodeDesc childNodeDesc = astToExprNodeDesc.get(childAST);
                    if (childNodeDesc == null) {
                        throw new SemanticException(
                                "Invalid CLUSTER BY expression: " + childAST.toString());
                    }
                    RexNode childRexNode = converter.convert(childNodeDesc).accept(funcConverter);
                    int fieldIndex;
                    if (childRexNode instanceof RexInputRef) {
                        fieldIndex = ((RexInputRef) childRexNode).getIndex();
                    } else {
                        fieldIndex = numSrcFields + virtualCols.size();
                        virtualCols.add(childRexNode);
                        vcASTAndType.add(new Pair<>(childAST, childNodeDesc.getTypeInfo()));
                    }
                    // cluster by doesn't support specifying ASC/DESC or NULLS FIRST/LAST, so use
                    // default values
                    fieldCollations.add(
                            new RelFieldCollation(
                                    fieldIndex,
                                    RelFieldCollation.Direction.ASCENDING,
                                    RelFieldCollation.NullDirection.FIRST));
                    distKeys.add(fieldIndex);
                }
            } else {
                // handle sort by
                if (sortAST != null) {
                    for (Node node : sortAST.getChildren()) {
                        HiveParserASTNode childAST = (HiveParserASTNode) node;
                        HiveParserASTNode nullOrderAST = (HiveParserASTNode) childAST.getChild(0);
                        HiveParserASTNode fieldAST = (HiveParserASTNode) nullOrderAST.getChild(0);
                        Map<HiveParserASTNode, ExprNodeDesc> astToExprNodeDesc =
                                semanticAnalyzer.genAllExprNodeDesc(fieldAST, inputRR);
                        ExprNodeDesc fieldNodeDesc = astToExprNodeDesc.get(fieldAST);
                        if (fieldNodeDesc == null) {
                            throw new SemanticException(
                                    "Invalid sort by expression: " + fieldAST.toString());
                        }
                        RexNode childRexNode =
                                converter.convert(fieldNodeDesc).accept(funcConverter);
                        int fieldIndex;
                        if (childRexNode instanceof RexInputRef) {
                            fieldIndex = ((RexInputRef) childRexNode).getIndex();
                        } else {
                            fieldIndex = numSrcFields + virtualCols.size();
                            virtualCols.add(childRexNode);
                            vcASTAndType.add(new Pair<>(childAST, fieldNodeDesc.getTypeInfo()));
                        }
                        RelFieldCollation.Direction direction =
                                RelFieldCollation.Direction.DESCENDING;
                        if (childAST.getType() == HiveASTParser.TOK_TABSORTCOLNAMEASC) {
                            direction = RelFieldCollation.Direction.ASCENDING;
                        }
                        RelFieldCollation.NullDirection nullOrder;
                        if (nullOrderAST.getType() == HiveASTParser.TOK_NULLS_FIRST) {
                            nullOrder = RelFieldCollation.NullDirection.FIRST;
                        } else if (nullOrderAST.getType() == HiveASTParser.TOK_NULLS_LAST) {
                            nullOrder = RelFieldCollation.NullDirection.LAST;
                        } else {
                            throw new SemanticException(
                                    "Unexpected null ordering option: " + nullOrderAST.getType());
                        }
                        fieldCollations.add(
                                new RelFieldCollation(fieldIndex, direction, nullOrder));
                    }
                }
                // handle distribute by
                if (distAST != null) {
                    for (Node node : distAST.getChildren()) {
                        HiveParserASTNode childAST = (HiveParserASTNode) node;
                        Map<HiveParserASTNode, ExprNodeDesc> astToExprNodeDesc =
                                semanticAnalyzer.genAllExprNodeDesc(childAST, inputRR);
                        ExprNodeDesc childNodeDesc = astToExprNodeDesc.get(childAST);
                        if (childNodeDesc == null) {
                            throw new SemanticException(
                                    "Invalid DISTRIBUTE BY expression: " + childAST.toString());
                        }
                        RexNode childRexNode =
                                converter.convert(childNodeDesc).accept(funcConverter);
                        int fieldIndex;
                        if (childRexNode instanceof RexInputRef) {
                            fieldIndex = ((RexInputRef) childRexNode).getIndex();
                        } else {
                            fieldIndex = numSrcFields + virtualCols.size();
                            virtualCols.add(childRexNode);
                            vcASTAndType.add(new Pair<>(childAST, childNodeDesc.getTypeInfo()));
                        }
                        distKeys.add(fieldIndex);
                    }
                }
            }
            Preconditions.checkState(
                    !fieldCollations.isEmpty() || !distKeys.isEmpty(),
                    "Both field collations and dist keys are empty");

            // add child SEL if needed
            RelNode realInput = srcRel;
            HiveParserRowResolver outputRR = new HiveParserRowResolver();
            if (!virtualCols.isEmpty()) {
                List<RexNode> originalInputRefs =
                        srcRel.getRowType().getFieldList().stream()
                                .map(input -> new RexInputRef(input.getIndex(), input.getType()))
                                .collect(Collectors.toList());
                HiveParserRowResolver addedProjectRR = new HiveParserRowResolver();
                if (!HiveParserRowResolver.add(addedProjectRR, inputRR)) {
                    throw new SemanticException(
                            "Duplicates detected when adding columns to RR: see previous message");
                }
                int vColPos = inputRR.getRowSchema().getSignature().size();
                for (Pair<HiveParserASTNode, TypeInfo> astTypePair : vcASTAndType) {
                    addedProjectRR.putExpression(
                            astTypePair.getKey(),
                            new ColumnInfo(
                                    getColumnInternalName(vColPos),
                                    astTypePair.getValue(),
                                    null,
                                    false));
                    vColPos++;
                }
                realInput =
                        genSelectRelNode(
                                CompositeList.of(originalInputRefs, virtualCols),
                                addedProjectRR,
                                srcRel);

                if (outermostOB) {
                    if (!HiveParserRowResolver.add(outputRR, inputRR)) {
                        throw new SemanticException(
                                "Duplicates detected when adding columns to RR: see previous message");
                    }
                } else {
                    if (!HiveParserRowResolver.add(outputRR, addedProjectRR)) {
                        throw new SemanticException(
                                "Duplicates detected when adding columns to RR: see previous message");
                    }
                }
                originalInput = srcRel;
            } else {
                if (!HiveParserRowResolver.add(outputRR, inputRR)) {
                    throw new SemanticException(
                            "Duplicates detected when adding columns to RR: see previous message");
                }
            }

            // create rel node
            RelTraitSet traitSet = cluster.traitSet();
            RelCollation canonizedCollation = traitSet.canonize(RelCollations.of(fieldCollations));
            res = LogicalDistribution.create(realInput, canonizedCollation, distKeys);

            Map<String, Integer> hiveColNameCalcitePosMap = buildHiveToCalciteColumnMap(outputRR);
            relToRowResolver.put(res, outputRR);
            relToHiveColNameCalcitePosMap.put(res, hiveColNameCalcitePosMap);
        }

        return (new Pair<>(res, originalInput));
    }