in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/serialization/MongoRowDataSerializationSchema.java [68:88]
private WriteModel<BsonDocument> processUpsert(RowData row) {
final BsonDocument document = rowDataToBsonConverter.convert(row);
final BsonValue key = primaryKeyExtractor.apply(row);
if (key != null) {
BsonDocument filter = new BsonDocument("_id", key);
// For upsert operation on a sharded collection, the full sharded key must be included
// in the filter.
BsonDocument shardKeysFilter = shardKeysExtractor.apply(row);
if (!shardKeysFilter.isEmpty()) {
filter.putAll(shardKeysFilter);
}
// _id is immutable, so we remove it here to prevent exception.
document.remove("_id");
BsonDocument update = new BsonDocument("$set", document);
return new UpdateOneModel<>(filter, update, new UpdateOptions().upsert(true));
} else {
return new InsertOneModel<>(document);
}
}