public ScanRuntimeProvider getScanRuntimeProvider()

in flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java [94:124]


    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
        readOptions.setReadFields(Arrays.stream(physicalSchema.getFieldNames())
                .map(item->String.format("`%s`", item.trim().replace("`", "")))
                .collect(Collectors.joining(", ")));

        if (readOptions.getUseOldApi()) {
            List<PartitionDefinition> dorisPartitions;
            try {
                dorisPartitions = RestService.findPartitions(options, readOptions, LOG);
            } catch (DorisException e) {
                throw new RuntimeException("Failed fetch doris partitions");
            }
            DorisRowDataInputFormat.Builder builder = DorisRowDataInputFormat.builder()
                    .setFenodes(options.getFenodes())
                    .setUsername(options.getUsername())
                    .setPassword(options.getPassword())
                    .setTableIdentifier(options.getTableIdentifier())
                    .setPartitions(dorisPartitions)
                    .setReadOptions(readOptions)
                    .setRowType((RowType) physicalSchema.toRowDataType().getLogicalType());
            return InputFormatProvider.of(builder.build());
        } else {
            //Read data using the interface of the FLIP-27 specification
            DorisSource<RowData> build = DorisSourceBuilder.<RowData>builder()
                    .setDorisReadOptions(readOptions)
                    .setDorisOptions(options)
                    .setDeserializer(new RowDataDeserializationSchema((RowType) physicalSchema.toRowDataType().getLogicalType()))
                    .build();
            return SourceProvider.of(build);
        }
    }