in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java [155:188]
protected Optional<UniqueConstraint> getPrimaryKey(
DatabaseMetaData metaData, String database, String schema, String table)
throws SQLException {
// According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
// the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
// We need to sort them based on the KEY_SEQ value.
// In the currently supported database dialects MySQL and Postgres,
// the database term is equivalent to catalog term.
// We need to pass the database name as catalog parameter for retrieving primary keys by
// full table identifier.
ResultSet rs = metaData.getPrimaryKeys(database, schema, table);
Map<Integer, String> keySeqColumnName = new HashMap<>();
String pkName = null;
while (rs.next()) {
String columnName = rs.getString("COLUMN_NAME");
pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same
int keySeq = rs.getInt("KEY_SEQ");
Preconditions.checkState(
!keySeqColumnName.containsKey(keySeq - 1),
"The field(s) of primary key must be from the same table.");
keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index
}
List<String> pkFields =
Arrays.asList(new String[keySeqColumnName.size()]); // initialize size
keySeqColumnName.forEach(pkFields::set);
if (!pkFields.isEmpty()) {
// PK_NAME maybe null according to the javadoc, generate an unique name in that case
pkName = pkName == null ? "pk_" + String.join("_", pkFields) : pkName;
return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
}
return Optional.empty();
}