public SinkRuntimeProvider getSinkRuntimeProvider()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSink.java [94:116]


    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        final RowDataToBsonConverter rowDataToBsonConverter =
                RowDataToBsonConverters.createConverter(
                        (RowType) resolvedSchema.toPhysicalRowDataType().getLogicalType());

        final MongoRowDataSerializationSchema serializationSchema =
                new MongoRowDataSerializationSchema(
                        rowDataToBsonConverter, primaryKeyExtractor, shardKeysExtractor);

        final MongoSink<RowData> mongoSink =
                MongoSink.<RowData>builder()
                        .setUri(connectionOptions.getUri())
                        .setDatabase(connectionOptions.getDatabase())
                        .setCollection(connectionOptions.getCollection())
                        .setBatchSize(writeOptions.getBatchSize())
                        .setBatchIntervalMs(writeOptions.getBatchIntervalMs())
                        .setDeliveryGuarantee(writeOptions.getDeliveryGuarantee())
                        .setMaxRetries(writeOptions.getMaxRetries())
                        .setSerializationSchema(serializationSchema)
                        .build();

        return SinkV2Provider.of(mongoSink, parallelism);
    }