public ScanRuntimeProvider getScanRuntimeProvider()

in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/AdbpgDynamicTableSource.java [101:131]


    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
        JdbcReadOptions readOptions = AdbpgOptions.getJdbcReadOptions(config);
        AdbpgDialect adbpgDialect = new AdbpgDialect(config.get(AdbpgOptions.TARGET_SCHEMA), AdbpgOptions.isConfigOptionTrue(config, AdbpgOptions.CASE_SENSITIVE));
        String query =
                adbpgDialect.getSelectFromStatement(
                        config.get(AdbpgOptions.TABLE_NAME), tableSchema.getFieldNames(), new String[0]);
        Object[][] parameterValues = null;
        if (readOptions.getPartitionColumnName().isPresent()) {
            long lowerBound = readOptions.getPartitionLowerBound().get();
            long upperBound = readOptions.getPartitionUpperBound().get();
            int numPartitions = readOptions.getNumPartitions().get();
            JdbcParameterValuesProvider parameterValuesProvider = new JdbcNumericBetweenParametersProvider(lowerBound, upperBound).ofBatchNum(numPartitions);
            parameterValues = parameterValuesProvider.getParameterValues();
            query +=
                    " WHERE "
                            + adbpgDialect.quoteIdentifier(readOptions.getPartitionColumnName().get())
                            + " BETWEEN %s AND %s";
        }
        if (limit >= 0) {
            query = String.format("%s limit %s", query, limit);
        }

        final InputFormat<RowData, InputSplit> inputFunction = new AdbpgDataScanFunction(
                fieldNum,
                fieldNamesStr,
                lts,
                config,
                tableSchema, parameterValues, query);

        return InputFormatProvider.of(inputFunction);
    }