in src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java [87:115]
public List<String> extractColumnValuesBySQL(
String sql, int columnIndex, Predicate<String> filterFunc, Object... params) {
List<String> columnValues = Lists.newArrayList();
try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection();
PreparedStatement ps = connection.prepareStatement(sql)) {
if (Objects.nonNull(params) && params.length > 0) {
for (int i = 0; i < params.length; i++) {
ps.setObject(i + 1, params[i]);
}
}
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
String columnValue = rs.getString(columnIndex);
if (filterFunc == null || filterFunc.test(columnValue)) {
columnValues.add(columnValue);
}
}
}
return columnValues;
} catch (Exception e) {
LOG.error("The following SQL query could not be executed: {}", sql, e);
throw new DorisException(
String.format("The following SQL query could not be executed: %s", sql), e);
}
}