public static void validatePrimaryKey()

in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchValidationUtils.java [59:91]


    public static void validatePrimaryKey(TableSchema schema) {
        schema.getPrimaryKey()
                .ifPresent(
                        key -> {
                            List<LogicalTypeRoot> illegalTypes =
                                    key.getColumns().stream()
                                            .map(
                                                    fieldName -> {
                                                        LogicalType logicalType =
                                                                schema.getFieldDataType(fieldName)
                                                                        .get()
                                                                        .getLogicalType();
                                                        if (logicalType.is(
                                                                LogicalTypeRoot.DISTINCT_TYPE)) {
                                                            return ((DistinctType) logicalType)
                                                                    .getSourceType()
                                                                    .getTypeRoot();
                                                        } else {
                                                            return logicalType.getTypeRoot();
                                                        }
                                                    })
                                            .filter(ILLEGAL_PRIMARY_KEY_TYPES::contains)
                                            .collect(Collectors.toList());

                            if (!illegalTypes.isEmpty()) {
                                throw new ValidationException(
                                        String.format(
                                                "The table has a primary key on columns of illegal types: %s.\n"
                                                        + " Elasticsearch sink does not support primary keys on columns of types: %s.",
                                                illegalTypes, ILLEGAL_PRIMARY_KEY_TYPES));
                            }
                        });
    }