in flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java [94:124]
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
readOptions.setReadFields(Arrays.stream(physicalSchema.getFieldNames())
.map(item->String.format("`%s`", item.trim().replace("`", "")))
.collect(Collectors.joining(", ")));
if (readOptions.getUseOldApi()) {
List<PartitionDefinition> dorisPartitions;
try {
dorisPartitions = RestService.findPartitions(options, readOptions, LOG);
} catch (DorisException e) {
throw new RuntimeException("Failed fetch doris partitions");
}
DorisRowDataInputFormat.Builder builder = DorisRowDataInputFormat.builder()
.setFenodes(options.getFenodes())
.setUsername(options.getUsername())
.setPassword(options.getPassword())
.setTableIdentifier(options.getTableIdentifier())
.setPartitions(dorisPartitions)
.setReadOptions(readOptions)
.setRowType((RowType) physicalSchema.toRowDataType().getLogicalType());
return InputFormatProvider.of(builder.build());
} else {
//Read data using the interface of the FLIP-27 specification
DorisSource<RowData> build = DorisSourceBuilder.<RowData>builder()
.setDorisReadOptions(readOptions)
.setDorisOptions(options)
.setDeserializer(new RowDataDeserializationSchema((RowType) physicalSchema.toRowDataType().getLogicalType()))
.build();
return SourceProvider.of(build);
}
}