protected Optional getPrimaryKey()

in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/database/catalog/AbstractJdbcCatalog.java [181:214]


    protected Optional<UniqueConstraint> getPrimaryKey(
            DatabaseMetaData metaData, String database, String schema, String table)
            throws SQLException {

        // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
        // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
        // We need to sort them based on the KEY_SEQ value.
        // In the currently supported database dialects MySQL and Postgres,
        // the database term is equivalent to catalog term.
        // We need to pass the database name as catalog parameter for retrieving primary keys by
        // full table identifier.
        ResultSet rs = metaData.getPrimaryKeys(database, schema, table);

        Map<Integer, String> keySeqColumnName = new HashMap<>();
        String pkName = null;
        while (rs.next()) {
            String columnName = rs.getString("COLUMN_NAME");
            pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same
            int keySeq = rs.getInt("KEY_SEQ");
            Preconditions.checkState(
                    !keySeqColumnName.containsKey(keySeq - 1),
                    "The field(s) of primary key must be from the same table.");
            keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index
        }
        List<String> pkFields =
                Arrays.asList(new String[keySeqColumnName.size()]); // initialize size
        keySeqColumnName.forEach(pkFields::set);
        if (!pkFields.isEmpty()) {
            // PK_NAME maybe null according to the javadoc, generate an unique name in that case
            pkName = pkName == null ? "pk_" + String.join("_", pkFields) : pkName;
            return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
        }
        return Optional.empty();
    }