in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/AdbpgDynamicTableSource.java [101:131]
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
JdbcReadOptions readOptions = AdbpgOptions.getJdbcReadOptions(config);
AdbpgDialect adbpgDialect = new AdbpgDialect(config.get(AdbpgOptions.TARGET_SCHEMA), AdbpgOptions.isConfigOptionTrue(config, AdbpgOptions.CASE_SENSITIVE));
String query =
adbpgDialect.getSelectFromStatement(
config.get(AdbpgOptions.TABLE_NAME), tableSchema.getFieldNames(), new String[0]);
Object[][] parameterValues = null;
if (readOptions.getPartitionColumnName().isPresent()) {
long lowerBound = readOptions.getPartitionLowerBound().get();
long upperBound = readOptions.getPartitionUpperBound().get();
int numPartitions = readOptions.getNumPartitions().get();
JdbcParameterValuesProvider parameterValuesProvider = new JdbcNumericBetweenParametersProvider(lowerBound, upperBound).ofBatchNum(numPartitions);
parameterValues = parameterValuesProvider.getParameterValues();
query +=
" WHERE "
+ adbpgDialect.quoteIdentifier(readOptions.getPartitionColumnName().get())
+ " BETWEEN %s AND %s";
}
if (limit >= 0) {
query = String.format("%s limit %s", query, limit);
}
final InputFormat<RowData, InputSplit> inputFunction = new AdbpgDataScanFunction(
fieldNum,
fieldNamesStr,
lts,
config,
tableSchema, parameterValues, query);
return InputFormatProvider.of(inputFunction);
}