in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java [122:187]
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
final JdbcRowDataInputFormat.Builder builder =
JdbcRowDataInputFormat.builder()
.setDrivername(options.getDriverName())
.setDBUrl(options.getDbURL())
.setUsername(options.getUsername().orElse(null))
.setPassword(options.getPassword().orElse(null))
.setAutoCommit(readOptions.getAutoCommit());
if (readOptions.getFetchSize() != 0) {
builder.setFetchSize(readOptions.getFetchSize());
}
final JdbcDialect dialect = options.getDialect();
String query =
dialect.getSelectFromStatement(
options.getTableName(),
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
new String[0]);
final List<String> predicates = new ArrayList<String>();
if (readOptions.getPartitionColumnName().isPresent()) {
long lowerBound = readOptions.getPartitionLowerBound().get();
long upperBound = readOptions.getPartitionUpperBound().get();
int numPartitions = readOptions.getNumPartitions().get();
Serializable[][] allPushdownParams = replicatePushdownParamsForN(numPartitions);
JdbcParameterValuesProvider allParams =
new CompositeJdbcParameterValuesProvider(
new JdbcNumericBetweenParametersProvider(lowerBound, upperBound)
.ofBatchNum(numPartitions),
new JdbcGenericParameterValuesProvider(allPushdownParams));
builder.setParametersProvider(allParams);
predicates.add(
dialect.quoteIdentifier(readOptions.getPartitionColumnName().get())
+ " BETWEEN ? AND ?");
} else {
builder.setParametersProvider(
new JdbcGenericParameterValuesProvider(replicatePushdownParamsForN(1)));
}
predicates.addAll(this.resolvedPredicates);
if (predicates.size() > 0) {
String joinedConditions =
predicates.stream()
.map(pred -> String.format("(%s)", pred))
.collect(Collectors.joining(" AND "));
query += " WHERE " + joinedConditions;
}
if (limit >= 0) {
query = String.format("%s %s", query, dialect.getLimitClause(limit));
}
LOG.debug("Query generated for JDBC scan: " + query);
builder.setQuery(query);
final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
builder.setRowConverter(dialect.getRowConverter(rowType));
builder.setRowDataTypeInfo(
runtimeProviderContext.createTypeInformation(physicalRowDataType));
return InputFormatProvider.of(builder.build());
}