in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchValidationUtils.java [66:89]
public static void validatePrimaryKey(DataType primaryKeyDataType) {
List<DataType> fieldDataTypes = DataType.getFieldDataTypes(primaryKeyDataType);
List<LogicalTypeRoot> illegalTypes =
fieldDataTypes.stream()
.map(DataType::getLogicalType)
.map(
logicalType -> {
if (logicalType.is(LogicalTypeRoot.DISTINCT_TYPE)) {
return ((DistinctType) logicalType)
.getSourceType()
.getTypeRoot();
} else {
return logicalType.getTypeRoot();
}
})
.filter(t -> !ALLOWED_PRIMARY_KEY_TYPES.contains(t))
.collect(Collectors.toList());
if (!illegalTypes.isEmpty()) {
throw new ValidationException(
String.format(
"The table has a primary key on columns of illegal types: %s.",
illegalTypes));
}
}