in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/utils/AdbpgDialect.java [96:137]
public String getPKsFromADBPGTable(Connection conn, String targetSchema, String targetTable) {
if (this.tablePKs.size() > 0) {
String pks = Arrays.toString(tablePKs.toArray());
LOG.info("Table pks:" + pks);
return pks;
}
final String DEFAULT_POSTGRESQL_SCHEMA = "public";
try {
// The assembled upsert statement is used to query all primary key column names form the given adbpg table
PreparedStatement pstm = conn.prepareStatement(
"SELECT a.attname " +
"FROM pg_constraint AS c " +
"CROSS JOIN LATERAL UNNEST(c.conkey) AS cols(colnum) " +
"INNER JOIN pg_attribute AS a " +
"ON a.attrelid = c.conrelid AND cols.colnum = a.attnum " +
"WHERE c.contype = 'p' AND c.conrelid = ?::REGCLASS"
);
String schema_name = quoteIdentifier(targetSchema);
String table_name = quoteIdentifier(targetTable);
pstm.setString(1, schema_name + "." + table_name);
LOG.info("Getting primary keys of table " + schema_name + "." + table_name + " with sql:" + pstm.toString());
ResultSet resultSet = pstm.executeQuery();
while (resultSet.next()) {
tablePKs.add(resultSet.getString(1));
}
String pks = Arrays.toString(tablePKs.toArray());
LOG.info("Table pks:" + pks);
return pks;
} catch (SQLException e) {
LOG.error("Get adbpg table primary keys error.", e);
System.exit(255);
}
return null;
}