public ScanRuntimeProvider getScanRuntimeProvider()

in flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java [94:143]


    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
        if (!resolvedFilterQuery.isEmpty()) {
            String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
            if (!StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
                filterQuery =
                        String.format("(%s) AND (%s)", readOptions.getFilterQuery(), filterQuery);
            }
            readOptions.setFilterQuery(filterQuery);
        }

        if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
            String[] selectFields =
                    DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
            readOptions.setReadFields(
                    Arrays.stream(selectFields)
                            .map(item -> String.format("`%s`", item.trim().replace("`", "")))
                            .collect(Collectors.joining(", ")));
        }

        if (readOptions.getUseOldApi()) {
            List<PartitionDefinition> dorisPartitions;
            try {
                dorisPartitions = RestService.findPartitions(options, readOptions, LOG);
            } catch (DorisException e) {
                throw new RuntimeException("Failed fetch doris partitions");
            }
            DorisRowDataInputFormat.Builder builder =
                    DorisRowDataInputFormat.builder()
                            .setFenodes(options.getFenodes())
                            .setBenodes(options.getBenodes())
                            .setUsername(options.getUsername())
                            .setPassword(options.getPassword())
                            .setTableIdentifier(options.getTableIdentifier())
                            .setPartitions(dorisPartitions)
                            .setReadOptions(readOptions)
                            .setRowType((RowType) physicalRowDataType.getLogicalType());
            return InputFormatProvider.of(builder.build());
        } else {
            // Read data using the interface of the FLIP-27 specification
            DorisSource<RowData> build =
                    DorisSource.<RowData>builder()
                            .setDorisReadOptions(readOptions)
                            .setDorisOptions(options)
                            .setDeserializer(
                                    new RowDataDeserializationSchema(
                                            (RowType) physicalRowDataType.getLogicalType()))
                            .build();
            return SourceProvider.of(build);
        }
    }