private static RelNode sqlToRel()

in flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java [109:187]


    private static RelNode sqlToRel(
            List<Column> columns,
            SqlNode sqlNode,
            List<UserDefinedFunctionDescriptor> udfDescriptors,
            SupportedMetadataColumn[] supportedMetadataColumns) {
        List<Column> columnsWithMetadata =
                copyFillMetadataColumn(columns, supportedMetadataColumns);
        CalciteSchema rootSchema = CalciteSchema.createRootSchema(true);
        SchemaPlus schema = rootSchema.plus();
        Map<String, Object> operand = new HashMap<>();
        operand.put("tableName", DEFAULT_TABLE);
        operand.put("columns", columnsWithMetadata);
        rootSchema.add(
                DEFAULT_SCHEMA,
                TransformSchemaFactory.INSTANCE.create(schema, DEFAULT_SCHEMA, operand));
        List<SqlFunction> udfFunctions = new ArrayList<>();
        for (UserDefinedFunctionDescriptor udf : udfDescriptors) {
            try {
                Class<?> clazz = Class.forName(udf.getClasspath());
                SqlReturnTypeInference returnTypeInference;
                ScalarFunction function = ScalarFunctionImpl.create(clazz, "eval");
                Preconditions.checkNotNull(
                        function, "UDF function must provide at least one `eval` method.");
                if (udf.getReturnTypeHint() != null) {
                    // This UDF has return type hint annotation
                    returnTypeInference =
                            o -> {
                                RelDataTypeFactory typeFactory = o.getTypeFactory();
                                DataType returnTypeHint = udf.getReturnTypeHint();
                                return convertCalciteType(typeFactory, returnTypeHint);
                            };
                } else {
                    // Infer it from eval method return type
                    returnTypeInference = o -> function.getReturnType(o.getTypeFactory());
                }
                schema.add(udf.getName(), function);
                udfFunctions.add(
                        new SqlFunction(
                                udf.getName(),
                                SqlKind.OTHER_FUNCTION,
                                returnTypeInference,
                                InferTypes.RETURN_TYPE,
                                OperandTypes.VARIADIC,
                                SqlFunctionCategory.USER_DEFINED_FUNCTION));
            } catch (ClassNotFoundException e) {
                throw new RuntimeException("Failed to resolve UDF: " + udf, e);
            }
        }
        SqlTypeFactoryImpl factory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
        CalciteCatalogReader calciteCatalogReader =
                new CalciteCatalogReader(
                        rootSchema,
                        rootSchema.path(DEFAULT_SCHEMA),
                        factory,
                        new CalciteConnectionConfigImpl(new Properties()));
        TransformSqlOperatorTable transformSqlOperatorTable = TransformSqlOperatorTable.instance();
        SqlOperatorTable udfOperatorTable = SqlOperatorTables.of(udfFunctions);
        SqlValidator validator =
                SqlValidatorUtil.newValidator(
                        SqlOperatorTables.chain(transformSqlOperatorTable, udfOperatorTable),
                        calciteCatalogReader,
                        factory,
                        SqlValidator.Config.DEFAULT
                                .withIdentifierExpansion(true)
                                .withConformance(SqlConformanceEnum.MYSQL_5));
        SqlNode validateSqlNode = validator.validate(sqlNode);
        SqlToRelConverter sqlToRelConverter =
                new SqlToRelConverter(
                        null,
                        validator,
                        calciteCatalogReader,
                        RelOptCluster.create(
                                new HepPlanner(new HepProgramBuilder().build()),
                                new RexBuilder(factory)),
                        StandardConvertletTable.INSTANCE,
                        SqlToRelConverter.config().withTrimUnusedFields(true));
        RelRoot relRoot = sqlToRelConverter.convertQuery(validateSqlNode, false, false);
        return relRoot.rel;
    }