private RelNode genUDTFPlan()

in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java [2461:2685]


    private RelNode genUDTFPlan(
            SqlOperator sqlOperator,
            String genericUDTFName,
            String outputTableAlias,
            List<String> colAliases,
            HiveParserQB qb,
            List<RexNode> operands,
            List<ColumnInfo> opColInfos,
            RelNode input,
            boolean inSelect,
            boolean isOuter)
            throws SemanticException {
        Preconditions.checkState(!isOuter || !inSelect, "OUTER is not supported for SELECT UDTF");
        // No GROUP BY / DISTRIBUTE BY / SORT BY / CLUSTER BY
        HiveParserQBParseInfo qbp = qb.getParseInfo();
        if (inSelect && !qbp.getDestToGroupBy().isEmpty()) {
            throw new SemanticException(ErrorMsg.UDTF_NO_GROUP_BY.getMsg());
        }
        if (inSelect && !qbp.getDestToDistributeBy().isEmpty()) {
            throw new SemanticException(ErrorMsg.UDTF_NO_DISTRIBUTE_BY.getMsg());
        }
        if (inSelect && !qbp.getDestToSortBy().isEmpty()) {
            throw new SemanticException(ErrorMsg.UDTF_NO_SORT_BY.getMsg());
        }
        if (inSelect && !qbp.getDestToClusterBy().isEmpty()) {
            throw new SemanticException(ErrorMsg.UDTF_NO_CLUSTER_BY.getMsg());
        }
        if (inSelect && !qbp.getAliasToLateralViews().isEmpty()) {
            throw new SemanticException(ErrorMsg.UDTF_LATERAL_VIEW.getMsg());
        }

        LOG.debug("Table alias: " + outputTableAlias + " Col aliases: " + colAliases);

        // Create the object inspector for the input columns and initialize the UDTF
        RelDataType relDataType =
                HiveParserUtils.inferReturnTypeForOperands(
                        sqlOperator, operands, cluster.getTypeFactory());
        DataType dataType = HiveParserUtils.toDataType(relDataType);
        StructObjectInspector outputOI =
                (StructObjectInspector)
                        HiveInspectors.getObjectInspector(
                                HiveTypeUtil.toHiveTypeInfo(dataType, false));

        // make up a table alias if it's not present, so that we can properly generate a combined RR
        // this should only happen for select udtf
        if (outputTableAlias == null) {
            Preconditions.checkState(inSelect, "Table alias not specified for lateral view");
            String prefix = "select_" + genericUDTFName + "_alias_";
            int i = 0;
            while (qb.getAliases().contains(prefix + i)) {
                i++;
            }
            outputTableAlias = prefix + i;
        }
        if (colAliases.isEmpty()) {
            // user did not specify alias names, infer names from outputOI
            for (StructField field : outputOI.getAllStructFieldRefs()) {
                colAliases.add(field.getFieldName());
            }
        }
        // Make sure that the number of column aliases in the AS clause matches the number of
        // columns output by the UDTF
        int numOutputCols = outputOI.getAllStructFieldRefs().size();
        int numSuppliedAliases = colAliases.size();
        if (numOutputCols != numSuppliedAliases) {
            throw new SemanticException(
                    ErrorMsg.UDTF_ALIAS_MISMATCH.getMsg(
                            "expected "
                                    + numOutputCols
                                    + " aliases "
                                    + "but got "
                                    + numSuppliedAliases));
        }

        // Generate the output column info's / row resolver using internal names.
        ArrayList<ColumnInfo> udtfOutputCols = new ArrayList<>();

        Iterator<String> colAliasesIter = colAliases.iterator();
        for (StructField sf : outputOI.getAllStructFieldRefs()) {
            String colAlias = colAliasesIter.next();
            assert (colAlias != null);

            // Since the UDTF operator feeds into a LVJ operator that will rename all the internal
            // names,
            // we can just use field name from the UDTF's OI as the internal name
            ColumnInfo col =
                    new ColumnInfo(
                            sf.getFieldName(),
                            TypeInfoUtils.getTypeInfoFromObjectInspector(
                                    sf.getFieldObjectInspector()),
                            outputTableAlias,
                            false);
            udtfOutputCols.add(col);
        }

        // Create the row resolver for the table function scan
        HiveParserRowResolver udtfOutRR = new HiveParserRowResolver();
        for (int i = 0; i < udtfOutputCols.size(); i++) {
            udtfOutRR.put(outputTableAlias, colAliases.get(i), udtfOutputCols.get(i));
        }

        // Build row type from field <type, name>
        RelDataType retType = HiveParserTypeConverter.getType(cluster, udtfOutRR, null);

        List<RelDataType> argTypes = new ArrayList<>();

        RelDataTypeFactory dtFactory = cluster.getRexBuilder().getTypeFactory();
        for (ColumnInfo ci : opColInfos) {
            argTypes.add(HiveParserUtils.toRelDataType(ci.getType(), dtFactory));
        }

        SqlOperator calciteOp =
                HiveParserSqlFunctionConverter.getCalciteFn(
                        genericUDTFName, argTypes, retType, false, funcConverter);

        RexNode rexNode = cluster.getRexBuilder().makeCall(calciteOp, operands);

        // convert the rex call
        TableFunctionConverter udtfConverter =
                new TableFunctionConverter(
                        cluster,
                        input,
                        frameworkConfig.getOperatorTable(),
                        catalogReader.nameMatcher());
        RexCall convertedCall = (RexCall) rexNode.accept(udtfConverter);

        SqlOperator convertedOperator = convertedCall.getOperator();
        Preconditions.checkState(
                HiveParserUtils.isBridgingSqlFunction(convertedOperator),
                "Expect operator to be "
                        + HiveParserUtils.BRIDGING_SQL_FUNCTION_CLZ_NAME
                        + ", actually got "
                        + convertedOperator.getClass().getSimpleName());

        // TODO: how to decide this?
        Type elementType = Object[].class;
        // create LogicalTableFunctionScan
        RelNode tableFunctionScan =
                LogicalTableFunctionScan.create(
                        input.getCluster(),
                        Collections.emptyList(),
                        convertedCall,
                        elementType,
                        retType,
                        null);

        // remember the table alias for the UDTF so that we can reference the cols later
        qb.addAlias(outputTableAlias);

        RelNode correlRel;
        RexBuilder rexBuilder = cluster.getRexBuilder();
        // find correlation in the converted call
        Pair<List<CorrelationId>, ImmutableBitSet> correlUse = getCorrelationUse(convertedCall);
        // create correlate node
        if (correlUse == null) {
            correlRel =
                    calciteContext
                            .createRelBuilder()
                            .push(input)
                            .push(tableFunctionScan)
                            .join(
                                    isOuter ? JoinRelType.LEFT : JoinRelType.INNER,
                                    rexBuilder.makeLiteral(true))
                            .build();
        } else {
            if (correlUse.left.size() > 1) {
                tableFunctionScan =
                        DeduplicateCorrelateVariables.go(
                                rexBuilder,
                                correlUse.left.get(0),
                                Util.skip(correlUse.left),
                                tableFunctionScan);
            }
            correlRel =
                    LogicalCorrelate.create(
                            input,
                            tableFunctionScan,
                            correlUse.left.get(0),
                            correlUse.right,
                            isOuter ? JoinRelType.LEFT : JoinRelType.INNER);
        }

        // Add new rel & its RR to the maps
        relToHiveColNameCalcitePosMap.put(
                tableFunctionScan, buildHiveToCalciteColumnMap(udtfOutRR));
        relToRowResolver.put(tableFunctionScan, udtfOutRR);

        HiveParserRowResolver correlRR =
                HiveParserRowResolver.getCombinedRR(
                        relToRowResolver.get(input), relToRowResolver.get(tableFunctionScan));
        relToHiveColNameCalcitePosMap.put(correlRel, buildHiveToCalciteColumnMap(correlRR));
        relToRowResolver.put(correlRel, correlRR);

        if (!inSelect) {
            return correlRel;
        }

        // create project node
        List<RexNode> projects = new ArrayList<>();
        HiveParserRowResolver projectRR = new HiveParserRowResolver();
        int j = 0;
        for (int i = input.getRowType().getFieldCount();
                i < correlRel.getRowType().getFieldCount();
                i++) {
            projects.add(cluster.getRexBuilder().makeInputRef(correlRel, i));
            ColumnInfo inputColInfo = correlRR.getRowSchema().getSignature().get(i);
            String colAlias = inputColInfo.getAlias();
            ColumnInfo colInfo =
                    new ColumnInfo(
                            getColumnInternalName(j++),
                            inputColInfo.getObjectInspector(),
                            null,
                            false);
            projectRR.put(null, colAlias, colInfo);
        }
        RelNode projectNode =
                LogicalProject.create(
                        correlRel,
                        Collections.emptyList(),
                        projects,
                        tableFunctionScan.getRowType());
        relToHiveColNameCalcitePosMap.put(projectNode, buildHiveToCalciteColumnMap(projectRR));
        relToRowResolver.put(projectNode, projectRR);
        return projectNode;
    }