public String getPKsFromADBPGTable()

in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/utils/AdbpgDialect.java [96:137]


    public String getPKsFromADBPGTable(Connection conn, String targetSchema, String targetTable) {

        if (this.tablePKs.size() > 0) {
            String pks = Arrays.toString(tablePKs.toArray());
            LOG.info("Table pks:" + pks);
            return pks;
        }

        final String DEFAULT_POSTGRESQL_SCHEMA = "public";
        try {
            // The assembled upsert statement is used to query all primary key column names form the given adbpg table
            PreparedStatement pstm = conn.prepareStatement(
                    "SELECT a.attname " +
                            "FROM pg_constraint AS c " +
                            "CROSS JOIN LATERAL UNNEST(c.conkey) AS cols(colnum) " +
                            "INNER JOIN pg_attribute AS a " +
                            "ON a.attrelid = c.conrelid AND cols.colnum = a.attnum " +
                            "WHERE c.contype = 'p' AND c.conrelid = ?::REGCLASS"
            );
            String schema_name = quoteIdentifier(targetSchema);
            String table_name = quoteIdentifier(targetTable);

            pstm.setString(1, schema_name + "." + table_name);

            LOG.info("Getting primary keys of table " + schema_name + "." + table_name + " with sql:" + pstm.toString());

            ResultSet resultSet = pstm.executeQuery();
            while (resultSet.next()) {
                tablePKs.add(resultSet.getString(1));
            }

            String pks = Arrays.toString(tablePKs.toArray());

            LOG.info("Table pks:" + pks);

            return pks;
        } catch (SQLException e) {
            LOG.error("Get adbpg table primary keys error.", e);
            System.exit(255);
        }
        return null;
    }