in flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java [109:187]
private static RelNode sqlToRel(
List<Column> columns,
SqlNode sqlNode,
List<UserDefinedFunctionDescriptor> udfDescriptors,
SupportedMetadataColumn[] supportedMetadataColumns) {
List<Column> columnsWithMetadata =
copyFillMetadataColumn(columns, supportedMetadataColumns);
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true);
SchemaPlus schema = rootSchema.plus();
Map<String, Object> operand = new HashMap<>();
operand.put("tableName", DEFAULT_TABLE);
operand.put("columns", columnsWithMetadata);
rootSchema.add(
DEFAULT_SCHEMA,
TransformSchemaFactory.INSTANCE.create(schema, DEFAULT_SCHEMA, operand));
List<SqlFunction> udfFunctions = new ArrayList<>();
for (UserDefinedFunctionDescriptor udf : udfDescriptors) {
try {
Class<?> clazz = Class.forName(udf.getClasspath());
SqlReturnTypeInference returnTypeInference;
ScalarFunction function = ScalarFunctionImpl.create(clazz, "eval");
Preconditions.checkNotNull(
function, "UDF function must provide at least one `eval` method.");
if (udf.getReturnTypeHint() != null) {
// This UDF has return type hint annotation
returnTypeInference =
o -> {
RelDataTypeFactory typeFactory = o.getTypeFactory();
DataType returnTypeHint = udf.getReturnTypeHint();
return convertCalciteType(typeFactory, returnTypeHint);
};
} else {
// Infer it from eval method return type
returnTypeInference = o -> function.getReturnType(o.getTypeFactory());
}
schema.add(udf.getName(), function);
udfFunctions.add(
new SqlFunction(
udf.getName(),
SqlKind.OTHER_FUNCTION,
returnTypeInference,
InferTypes.RETURN_TYPE,
OperandTypes.VARIADIC,
SqlFunctionCategory.USER_DEFINED_FUNCTION));
} catch (ClassNotFoundException e) {
throw new RuntimeException("Failed to resolve UDF: " + udf, e);
}
}
SqlTypeFactoryImpl factory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
CalciteCatalogReader calciteCatalogReader =
new CalciteCatalogReader(
rootSchema,
rootSchema.path(DEFAULT_SCHEMA),
factory,
new CalciteConnectionConfigImpl(new Properties()));
TransformSqlOperatorTable transformSqlOperatorTable = TransformSqlOperatorTable.instance();
SqlOperatorTable udfOperatorTable = SqlOperatorTables.of(udfFunctions);
SqlValidator validator =
SqlValidatorUtil.newValidator(
SqlOperatorTables.chain(transformSqlOperatorTable, udfOperatorTable),
calciteCatalogReader,
factory,
SqlValidator.Config.DEFAULT
.withIdentifierExpansion(true)
.withConformance(SqlConformanceEnum.MYSQL_5));
SqlNode validateSqlNode = validator.validate(sqlNode);
SqlToRelConverter sqlToRelConverter =
new SqlToRelConverter(
null,
validator,
calciteCatalogReader,
RelOptCluster.create(
new HepPlanner(new HepProgramBuilder().build()),
new RexBuilder(factory)),
StandardConvertletTable.INSTANCE,
SqlToRelConverter.config().withTrimUnusedFields(true));
RelRoot relRoot = sqlToRelConverter.convertQuery(validateSqlNode, false, false);
return relRoot.rel;
}