in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSink.java [79:100]
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
final RowDataToBsonConverter rowDataToBsonConverter =
RowDataToBsonConverters.createConverter(
(RowType) physicalRowDataType.getLogicalType());
final MongoRowDataSerializationSchema serializationSchema =
new MongoRowDataSerializationSchema(rowDataToBsonConverter, keyExtractor);
final MongoSink<RowData> mongoSink =
MongoSink.<RowData>builder()
.setUri(connectionOptions.getUri())
.setDatabase(connectionOptions.getDatabase())
.setCollection(connectionOptions.getCollection())
.setBatchSize(writeOptions.getBatchSize())
.setBatchIntervalMs(writeOptions.getBatchIntervalMs())
.setDeliveryGuarantee(writeOptions.getDeliveryGuarantee())
.setMaxRetries(writeOptions.getMaxRetries())
.setSerializationSchema(serializationSchema)
.build();
return SinkV2Provider.of(mongoSink, parallelism);
}