private WriteModel processUpsert()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/serialization/MongoRowDataSerializationSchema.java [68:88]


    private WriteModel<BsonDocument> processUpsert(RowData row) {
        final BsonDocument document = rowDataToBsonConverter.convert(row);
        final BsonValue key = primaryKeyExtractor.apply(row);
        if (key != null) {
            BsonDocument filter = new BsonDocument("_id", key);

            // For upsert operation on a sharded collection, the full sharded key must be included
            // in the filter.
            BsonDocument shardKeysFilter = shardKeysExtractor.apply(row);
            if (!shardKeysFilter.isEmpty()) {
                filter.putAll(shardKeysFilter);
            }

            // _id is immutable, so we remove it here to prevent exception.
            document.remove("_id");
            BsonDocument update = new BsonDocument("$set", document);
            return new UpdateOneModel<>(filter, update, new UpdateOptions().upsert(true));
        } else {
            return new InsertOneModel<>(document);
        }
    }