public DynamicTableSink createDynamicTableSink()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactory.java [144:163]


    public DynamicTableSink createDynamicTableSink(Context context) {
        final FactoryUtil.TableFactoryHelper helper =
                FactoryUtil.createTableFactoryHelper(this, context);

        MongoConfiguration config = new MongoConfiguration(helper.getOptions());
        helper.validate();

        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
        boolean isUpsert = schema.getPrimaryKey().isPresent();
        SerializableFunction<RowData, BsonValue> keyExtractor =
                MongoKeyExtractor.createKeyExtractor(schema);

        return new MongoDynamicTableSink(
                getConnectionOptions(config),
                getWriteOptions(config),
                config.getSinkParallelism(),
                isUpsert,
                context.getPhysicalRowDataType(),
                keyExtractor);
    }