in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoKeyExtractor.java [88:129]
public static SerializableFunction<RowData, BsonValue> createKeyExtractor(
ResolvedSchema resolvedSchema) {
Optional<UniqueConstraint> primaryKey = resolvedSchema.getPrimaryKey();
int[] primaryKeyIndexes = resolvedSchema.getPrimaryKeyIndexes();
Optional<Column> reservedId = resolvedSchema.getColumn(RESERVED_ID);
// Primary key is not declared and reserved _id is not present.
if (!primaryKey.isPresent() && !reservedId.isPresent()) {
return APPEND_ONLY_KEY_EXTRACTOR;
}
if (reservedId.isPresent()) {
// Ambiguous keys being used due to the presence of an _id field.
if (!primaryKey.isPresent()
|| isCompoundPrimaryKey(primaryKeyIndexes)
|| !primaryKeyContainsReservedId(primaryKey.get())) {
throw new IllegalArgumentException(
"Ambiguous keys being used due to the presence of an _id field. "
+ "Either use the _id column as the key, or rename the _id column.");
}
}
DataType primaryKeyType;
if (isCompoundPrimaryKey(primaryKeyIndexes)) {
DataType physicalRowDataType = resolvedSchema.toPhysicalRowDataType();
primaryKeyType = Projection.of(primaryKeyIndexes).project(physicalRowDataType);
} else {
int primaryKeyIndex = primaryKeyIndexes[0];
Optional<Column> column = resolvedSchema.getColumn(primaryKeyIndex);
if (!column.isPresent()) {
throw new IllegalStateException(
String.format(
"No primary key column found with index '%s'.", primaryKeyIndex));
}
primaryKeyType = column.get().getDataType();
}
MongoValidationUtils.validatePrimaryKey(primaryKeyType);
return new MongoKeyExtractor(primaryKeyType.getLogicalType(), primaryKeyIndexes);
}