in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java [512:530]
protected static List<String> extractColumnValuesByStatement(
PreparedStatement ps, int columnIndex, Predicate<String> filterFunc, Object... params)
throws SQLException {
List<String> columnValues = Lists.newArrayList();
if (Objects.nonNull(params) && params.length > 0) {
for (int i = 0; i < params.length; i++) {
ps.setObject(i + 1, params[i]);
}
}
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
String columnValue = rs.getString(columnIndex);
if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) {
columnValues.add(columnValue);
}
}
}
return columnValues;
}