in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableFactory.java [144:163]
public DynamicTableSink createDynamicTableSink(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
MongoConfiguration config = new MongoConfiguration(helper.getOptions());
helper.validate();
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
boolean isUpsert = schema.getPrimaryKey().isPresent();
SerializableFunction<RowData, BsonValue> keyExtractor =
MongoKeyExtractor.createKeyExtractor(schema);
return new MongoDynamicTableSink(
getConnectionOptions(config),
getWriteOptions(config),
config.getSinkParallelism(),
isUpsert,
context.getPhysicalRowDataType(),
keyExtractor);
}