private RelNode genJoinRelNode()

in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java [462:690]


    private RelNode genJoinRelNode(
            RelNode leftRel,
            String leftTableAlias,
            RelNode rightRel,
            String rightTableAlias,
            JoinType hiveJoinType,
            HiveParserASTNode joinCondAst)
            throws SemanticException {
        HiveParserRowResolver leftRR = relToRowResolver.get(leftRel);
        HiveParserRowResolver rightRR = relToRowResolver.get(rightRel);

        // 1. Construct ExpressionNodeDesc representing Join Condition
        RexNode joinCondRex;
        List<String> namedColumns = null;
        if (joinCondAst != null) {
            HiveParserJoinTypeCheckCtx jCtx =
                    new HiveParserJoinTypeCheckCtx(
                            leftRR, rightRR, hiveJoinType, frameworkConfig, cluster);
            jCtx.setUnparseTranslator(semanticAnalyzer.unparseTranslator);
            HiveParserRowResolver combinedRR = HiveParserRowResolver.getCombinedRR(leftRR, rightRR);
            if (joinCondAst.getType() == HiveASTParser.TOK_TABCOLNAME
                    && !hiveJoinType.equals(JoinType.LEFTSEMI)) {
                namedColumns = new ArrayList<>();
                // We will transform using clause and make it look like an on-clause.
                // So, lets generate a valid on-clause AST from using.
                HiveParserASTNode and =
                        (HiveParserASTNode)
                                HiveASTParseDriver.ADAPTOR.create(HiveASTParser.KW_AND, "and");
                HiveParserASTNode equal = null;
                int count = 0;
                for (Node child : joinCondAst.getChildren()) {
                    String columnName = ((HiveParserASTNode) child).getText();
                    // dealing with views
                    if (semanticAnalyzer.unparseTranslator != null
                            && semanticAnalyzer.unparseTranslator.isEnabled()) {
                        semanticAnalyzer.unparseTranslator.addIdentifierTranslation(
                                (HiveParserASTNode) child);
                    }
                    namedColumns.add(columnName);
                    HiveParserASTNode left =
                            HiveParserASTBuilder.qualifiedName(leftTableAlias, columnName);
                    HiveParserASTNode right =
                            HiveParserASTBuilder.qualifiedName(rightTableAlias, columnName);
                    equal =
                            (HiveParserASTNode)
                                    HiveASTParseDriver.ADAPTOR.create(HiveASTParser.EQUAL, "=");
                    HiveASTParseDriver.ADAPTOR.addChild(equal, left);
                    HiveASTParseDriver.ADAPTOR.addChild(equal, right);
                    HiveASTParseDriver.ADAPTOR.addChild(and, equal);
                    count++;
                }
                joinCondAst = count > 1 ? and : equal;
            } else if (semanticAnalyzer.unparseTranslator != null
                    && semanticAnalyzer.unparseTranslator.isEnabled()) {
                semanticAnalyzer.genAllExprNodeDesc(joinCondAst, combinedRR, jCtx);
            }
            Map<HiveParserASTNode, ExprNodeDesc> exprNodes =
                    HiveParserUtils.genExprNode(joinCondAst, jCtx);
            if (jCtx.getError() != null) {
                throw new SemanticException(
                        generateErrorMessage(jCtx.getErrorSrcNode(), jCtx.getError()));
            }
            ExprNodeDesc joinCondExprNode = exprNodes.get(joinCondAst);
            List<RelNode> inputRels = new ArrayList<>();
            inputRels.add(leftRel);
            inputRels.add(rightRel);
            joinCondRex =
                    HiveParserRexNodeConverter.convert(
                                    cluster,
                                    joinCondExprNode,
                                    inputRels,
                                    relToRowResolver,
                                    relToHiveColNameCalcitePosMap,
                                    false,
                                    funcConverter)
                            .accept(funcConverter);
        } else {
            joinCondRex = cluster.getRexBuilder().makeLiteral(true);
        }

        // 3. Construct Join Rel Node and HiveParserRowResolver for the new Join Node
        boolean leftSemiJoin = false;
        JoinRelType calciteJoinType;
        switch (hiveJoinType) {
            case LEFTOUTER:
                calciteJoinType = JoinRelType.LEFT;
                break;
            case RIGHTOUTER:
                calciteJoinType = JoinRelType.RIGHT;
                break;
            case FULLOUTER:
                calciteJoinType = JoinRelType.FULL;
                break;
            case LEFTSEMI:
                calciteJoinType = JoinRelType.SEMI;
                leftSemiJoin = true;
                break;
            case INNER:
            default:
                calciteJoinType = JoinRelType.INNER;
                break;
        }

        RelNode topRel;
        HiveParserRowResolver topRR;
        if (leftSemiJoin) {
            List<RelDataTypeField> sysFieldList = new ArrayList<>();
            List<RexNode> leftJoinKeys = new ArrayList<>();
            List<RexNode> rightJoinKeys = new ArrayList<>();

            RexNode nonEquiConds =
                    HiveRelOptUtil.splitHiveJoinCondition(
                            sysFieldList,
                            Arrays.asList(leftRel, rightRel),
                            joinCondRex,
                            Arrays.asList(leftJoinKeys, rightJoinKeys),
                            null,
                            null);

            RelNode[] inputRels = new RelNode[] {leftRel, rightRel};
            final List<Integer> leftKeys = new ArrayList<>();
            final List<Integer> rightKeys = new ArrayList<>();
            RexNode remainingEquiCond =
                    HiveParserUtils.projectNonColumnEquiConditions(
                            RelFactories.DEFAULT_PROJECT_FACTORY,
                            inputRels,
                            leftJoinKeys,
                            rightJoinKeys,
                            0,
                            leftKeys,
                            rightKeys);
            // Adjust right input fields in nonEquiConds if previous call modified the input
            if (inputRels[0] != leftRel) {
                nonEquiConds =
                        RexUtil.shift(
                                nonEquiConds,
                                leftRel.getRowType().getFieldCount(),
                                inputRels[0].getRowType().getFieldCount()
                                        - leftRel.getRowType().getFieldCount());
            }
            joinCondRex =
                    remainingEquiCond != null
                            ? RexUtil.composeConjunction(
                                    cluster.getRexBuilder(),
                                    Arrays.asList(remainingEquiCond, nonEquiConds),
                                    false)
                            : nonEquiConds;
            topRel =
                    LogicalJoin.create(
                            inputRels[0],
                            inputRels[1],
                            Collections.emptyList(),
                            joinCondRex,
                            Collections.emptySet(),
                            calciteJoinType);

            // Create join RR: we need to check whether we need to update left RR in case
            // previous call to projectNonColumnEquiConditions updated it
            if (inputRels[0] != leftRel) {
                HiveParserRowResolver newLeftRR = new HiveParserRowResolver();
                if (!HiveParserRowResolver.add(newLeftRR, leftRR)) {
                    LOG.warn("Duplicates detected when adding columns to RR: see previous message");
                }
                for (int i = leftRel.getRowType().getFieldCount();
                        i < inputRels[0].getRowType().getFieldCount();
                        i++) {
                    ColumnInfo oColInfo =
                            new ColumnInfo(
                                    getColumnInternalName(i),
                                    HiveParserTypeConverter.convert(
                                            inputRels[0]
                                                    .getRowType()
                                                    .getFieldList()
                                                    .get(i)
                                                    .getType()),
                                    null,
                                    false);
                    newLeftRR.put(oColInfo.getTabAlias(), oColInfo.getInternalName(), oColInfo);
                }

                HiveParserRowResolver joinRR = new HiveParserRowResolver();
                if (!HiveParserRowResolver.add(joinRR, newLeftRR)) {
                    LOG.warn("Duplicates detected when adding columns to RR: see previous message");
                }
                relToHiveColNameCalcitePosMap.put(topRel, buildHiveToCalciteColumnMap(joinRR));
                relToRowResolver.put(topRel, joinRR);

                // Introduce top project operator to remove additional column(s) that have been
                // introduced
                List<RexNode> topFields = new ArrayList<>();
                List<String> topFieldNames = new ArrayList<>();
                for (int i = 0; i < leftRel.getRowType().getFieldCount(); i++) {
                    final RelDataTypeField field = leftRel.getRowType().getFieldList().get(i);
                    topFields.add(
                            leftRel.getCluster().getRexBuilder().makeInputRef(field.getType(), i));
                    topFieldNames.add(field.getName());
                }
                topRel =
                        LogicalProject.create(
                                topRel, Collections.emptyList(), topFields, topFieldNames);
            }

            topRR = new HiveParserRowResolver();
            if (!HiveParserRowResolver.add(topRR, leftRR)) {
                LOG.warn("Duplicates detected when adding columns to RR: see previous message");
            }
        } else {
            topRel =
                    LogicalJoin.create(
                            leftRel,
                            rightRel,
                            Collections.emptyList(),
                            joinCondRex,
                            Collections.emptySet(),
                            calciteJoinType);
            topRR = HiveParserRowResolver.getCombinedRR(leftRR, rightRR);
            if (namedColumns != null) {
                List<String> tableAliases = new ArrayList<>();
                tableAliases.add(leftTableAlias);
                tableAliases.add(rightTableAlias);
                topRR.setNamedJoinInfo(
                        new HiveParserNamedJoinInfo(tableAliases, namedColumns, hiveJoinType));
            }
        }

        relToHiveColNameCalcitePosMap.put(topRel, buildHiveToCalciteColumnMap(topRR));
        relToRowResolver.put(topRel, topRR);
        return topRel;
    }