public List extractColumnValuesBySQL()

in src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java [87:115]


    public List<String> extractColumnValuesBySQL(
            String sql, int columnIndex, Predicate<String> filterFunc, Object... params) {

        List<String> columnValues = Lists.newArrayList();

        try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection();
                PreparedStatement ps = connection.prepareStatement(sql)) {

            if (Objects.nonNull(params) && params.length > 0) {
                for (int i = 0; i < params.length; i++) {
                    ps.setObject(i + 1, params[i]);
                }
            }

            try (ResultSet rs = ps.executeQuery()) {
                while (rs.next()) {
                    String columnValue = rs.getString(columnIndex);
                    if (filterFunc == null || filterFunc.test(columnValue)) {
                        columnValues.add(columnValue);
                    }
                }
            }
            return columnValues;
        } catch (Exception e) {
            LOG.error("The following SQL query could not be executed: {}", sql, e);
            throw new DorisException(
                    String.format("The following SQL query could not be executed: %s", sql), e);
        }
    }