in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/common/utils/MongoValidationUtils.java [112:134]
public static void validatePrimaryKey(DataType primaryKeyDataType) {
List<DataType> fieldDataTypes = DataType.getFieldDataTypes(primaryKeyDataType);
List<DataType> illegalTypes = new ArrayList<>();
for (DataType fieldType : fieldDataTypes) {
LogicalTypeRoot typeRoot = fieldType.getLogicalType().getTypeRoot();
if (!ALLOWED_PRIMARY_KEY_TYPES.contains(typeRoot)) {
illegalTypes.add(fieldType);
if (!DENIED_PRIMARY_KEY_TYPES.contains(typeRoot)) {
LOG.warn(
"Detected newly added root type {} that should to be explicitly accepted"
+ " or rejected. Please reach out to the Flink maintainers.",
fieldType);
}
}
}
if (!illegalTypes.isEmpty()) {
throw new ValidationException(
String.format(
"The table has a primary key on columns of illegal types: %s. Allowed types are %s.",
illegalTypes, ALLOWED_PRIMARY_KEY_TYPES));
}
}