in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoShardKeysExtractor.java [62:82]
public BsonDocument apply(RowData rowData) {
BsonDocument shardKeysDoc =
Optional.ofNullable(shardKeysGetter.getFieldOrNull(rowData))
.map(shardKeys -> shardKeysConverter.apply(shardKeys).asDocument())
.orElse(EMPTY_DOCUMENT);
shardKeysDoc
.entrySet()
.forEach(
entry -> {
if (entry.getValue().isString()) {
String keyString = entry.getValue().asString().getValue();
// Try to restore MongoDB's ObjectId from string.
if (ObjectId.isValid(keyString)) {
entry.setValue(new BsonObjectId(new ObjectId(keyString)));
}
}
});
return shardKeysDoc;
}