in flink-connector-opensearch/src/main/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSink.java [380:408]
private BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
checkNotNull(listener);
BulkProcessor.Builder bulkProcessorBuilder =
BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener);
// This makes flush() blocking
bulkProcessorBuilder.setConcurrentRequests(0);
if (bulkProcessorFlushMaxActions != null) {
bulkProcessorBuilder.setBulkActions(bulkProcessorFlushMaxActions);
}
if (bulkProcessorFlushMaxSizeMb != null) {
configureBulkSize(bulkProcessorBuilder);
}
if (bulkProcessorFlushIntervalMillis != null) {
configureFlushInterval(bulkProcessorBuilder);
}
// if backoff retrying is disabled, bulkProcessorFlushBackoffPolicy will be null
configureBulkProcessorBackoff(bulkProcessorBuilder, bulkProcessorFlushBackoffPolicy);
return bulkProcessorBuilder.build();
}