in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchValidationUtils.java [59:91]
public static void validatePrimaryKey(TableSchema schema) {
schema.getPrimaryKey()
.ifPresent(
key -> {
List<LogicalTypeRoot> illegalTypes =
key.getColumns().stream()
.map(
fieldName -> {
LogicalType logicalType =
schema.getFieldDataType(fieldName)
.get()
.getLogicalType();
if (logicalType.is(
LogicalTypeRoot.DISTINCT_TYPE)) {
return ((DistinctType) logicalType)
.getSourceType()
.getTypeRoot();
} else {
return logicalType.getTypeRoot();
}
})
.filter(ILLEGAL_PRIMARY_KEY_TYPES::contains)
.collect(Collectors.toList());
if (!illegalTypes.isEmpty()) {
throw new ValidationException(
String.format(
"The table has a primary key on columns of illegal types: %s.\n"
+ " Elasticsearch sink does not support primary keys on columns of types: %s.",
illegalTypes, ILLEGAL_PRIMARY_KEY_TYPES));
}
});
}