private List getTableNames()

in seatunnel-datasource/seatunnel-datasource-plugins/datasource-sqlserver-cdc/src/main/java/org/apache/seatunnel/datasource/plugin/cdc/sqlserver/SqlServerCDCDataSourceChannel.java [149:204]


    private List<String> getTableNames(
            Map<String, String> requestParams, String dbName, Map<String, String> options) {
        StringBuilder sqlWhere = new StringBuilder();
        final String sql =
                String.format(
                        "SELECT SCHEMAS.NAME AS SCHEMA_NAME, TABLES.NAME AS TABLE_NAME"
                                + "    FROM %s.sys.schemas AS SCHEMAS"
                                + "        JOIN %s.sys.tables AS TABLES"
                                + "            ON SCHEMAS.SCHEMA_ID = TABLES.SCHEMA_ID"
                                + "                   AND TABLES.IS_TRACKED_BY_CDC = 1",
                        dbName, dbName);
        sqlWhere.append(sql);
        String filterName = options.get("filterName");
        if (StringUtils.isNotEmpty(filterName)) {
            String[] split = filterName.split("\\.");
            if (split.length == 2) {
                String formatStr =
                        " AND (TABLES.NAME LIKE '"
                                + (split[1].contains("%") ? split[1] : "%" + split[1] + "%")
                                + "' AND SCHEMAS.NAME LIKE '"
                                + (split[0].contains("%") ? split[0] : "%" + split[0] + "%")
                                + "')";
                sqlWhere.append(formatStr);
            } else {
                String filterNameRep =
                        filterName.contains("%") ? filterName : "%" + filterName + "%";
                String formatStr =
                        " AND (TABLES.NAME LIKE '"
                                + filterNameRep
                                + "' OR SCHEMAS.NAME LIKE '"
                                + filterNameRep
                                + "')";
                sqlWhere.append(formatStr);
            }
        }

        sqlWhere.append(" ORDER BY SCHEMAS.NAME, TABLES.NAME");
        String size = options.get("size");
        if (StringUtils.isNotEmpty(size)) {
            sqlWhere.append(" OFFSET 0 ROWS FETCH NEXT ").append(size).append(" ROWS ONLY");
        }
        log.info("execute sql :{}", sqlWhere.toString());
        List<String> tableNames = new ArrayList<>();
        try (Connection connection = init(requestParams);
                Statement statement = connection.createStatement();
                ResultSet resultSet = statement.executeQuery(sqlWhere.toString())) {
            while (resultSet.next()) {
                String schemaName = resultSet.getString("SCHEMA_NAME");
                String tableName = resultSet.getString("TABLE_NAME");
                tableNames.add(schemaName + "." + tableName);
            }
            return tableNames;
        } catch (SQLException e) {
            throw new DataSourcePluginException("get table names failed", e);
        }
    }