in flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java [94:143]
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
if (!resolvedFilterQuery.isEmpty()) {
String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
if (!StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
filterQuery =
String.format("(%s) AND (%s)", readOptions.getFilterQuery(), filterQuery);
}
readOptions.setFilterQuery(filterQuery);
}
if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
String[] selectFields =
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
readOptions.setReadFields(
Arrays.stream(selectFields)
.map(item -> String.format("`%s`", item.trim().replace("`", "")))
.collect(Collectors.joining(", ")));
}
if (readOptions.getUseOldApi()) {
List<PartitionDefinition> dorisPartitions;
try {
dorisPartitions = RestService.findPartitions(options, readOptions, LOG);
} catch (DorisException e) {
throw new RuntimeException("Failed fetch doris partitions");
}
DorisRowDataInputFormat.Builder builder =
DorisRowDataInputFormat.builder()
.setFenodes(options.getFenodes())
.setBenodes(options.getBenodes())
.setUsername(options.getUsername())
.setPassword(options.getPassword())
.setTableIdentifier(options.getTableIdentifier())
.setPartitions(dorisPartitions)
.setReadOptions(readOptions)
.setRowType((RowType) physicalRowDataType.getLogicalType());
return InputFormatProvider.of(builder.build());
} else {
// Read data using the interface of the FLIP-27 specification
DorisSource<RowData> build =
DorisSource.<RowData>builder()
.setDorisReadOptions(readOptions)
.setDorisOptions(options)
.setDeserializer(
new RowDataDeserializationSchema(
(RowType) physicalRowDataType.getLogicalType()))
.build();
return SourceProvider.of(build);
}
}