public ScanRuntimeProvider getScanRuntimeProvider()

in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java [122:187]


    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
        final JdbcRowDataInputFormat.Builder builder =
                JdbcRowDataInputFormat.builder()
                        .setDrivername(options.getDriverName())
                        .setDBUrl(options.getDbURL())
                        .setUsername(options.getUsername().orElse(null))
                        .setPassword(options.getPassword().orElse(null))
                        .setAutoCommit(readOptions.getAutoCommit());

        if (readOptions.getFetchSize() != 0) {
            builder.setFetchSize(readOptions.getFetchSize());
        }
        final JdbcDialect dialect = options.getDialect();
        String query =
                dialect.getSelectFromStatement(
                        options.getTableName(),
                        DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
                        new String[0]);
        final List<String> predicates = new ArrayList<String>();

        if (readOptions.getPartitionColumnName().isPresent()) {
            long lowerBound = readOptions.getPartitionLowerBound().get();
            long upperBound = readOptions.getPartitionUpperBound().get();
            int numPartitions = readOptions.getNumPartitions().get();

            Serializable[][] allPushdownParams = replicatePushdownParamsForN(numPartitions);
            JdbcParameterValuesProvider allParams =
                    new CompositeJdbcParameterValuesProvider(
                            new JdbcNumericBetweenParametersProvider(lowerBound, upperBound)
                                    .ofBatchNum(numPartitions),
                            new JdbcGenericParameterValuesProvider(allPushdownParams));

            builder.setParametersProvider(allParams);

            predicates.add(
                    dialect.quoteIdentifier(readOptions.getPartitionColumnName().get())
                            + " BETWEEN ? AND ?");
        } else {
            builder.setParametersProvider(
                    new JdbcGenericParameterValuesProvider(replicatePushdownParamsForN(1)));
        }

        predicates.addAll(this.resolvedPredicates);

        if (predicates.size() > 0) {
            String joinedConditions =
                    predicates.stream()
                            .map(pred -> String.format("(%s)", pred))
                            .collect(Collectors.joining(" AND "));
            query += " WHERE " + joinedConditions;
        }

        if (limit >= 0) {
            query = String.format("%s %s", query, dialect.getLimitClause(limit));
        }

        LOG.debug("Query generated for JDBC scan: " + query);

        builder.setQuery(query);
        final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
        builder.setRowConverter(dialect.getRowConverter(rowType));
        builder.setRowDataTypeInfo(
                runtimeProviderContext.createTypeInformation(physicalRowDataType));

        return InputFormatProvider.of(builder.build());
    }