in flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisSystem.java [114:141]
public List<String> extractColumnValuesBySQL(
String sql,
int columnIndex,
Predicate<String> filterFunc,
Object... params) {
List<String> columnValues = Lists.newArrayList();
try (PreparedStatement ps = jdbcConnectionProvider.getOrEstablishConnection().prepareStatement(sql)) {
if (Objects.nonNull(params) && params.length > 0) {
for (int i = 0; i < params.length; i++) {
ps.setObject(i + 1, params[i]);
}
}
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String columnValue = rs.getString(columnIndex);
if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) {
columnValues.add(columnValue);
}
}
return columnValues;
} catch (Exception e) {
throw new DorisSystemException(
String.format(
"The following SQL query could not be executed: %s", sql),
e);
}
}