private BulkProcessor buildBulkProcessor()

in flink-connector-opensearch/src/main/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSink.java [380:408]


    private BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
        checkNotNull(listener);

        BulkProcessor.Builder bulkProcessorBuilder =
                BulkProcessor.builder(
                        (request, bulkListener) ->
                                client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
                        listener);

        // This makes flush() blocking
        bulkProcessorBuilder.setConcurrentRequests(0);

        if (bulkProcessorFlushMaxActions != null) {
            bulkProcessorBuilder.setBulkActions(bulkProcessorFlushMaxActions);
        }

        if (bulkProcessorFlushMaxSizeMb != null) {
            configureBulkSize(bulkProcessorBuilder);
        }

        if (bulkProcessorFlushIntervalMillis != null) {
            configureFlushInterval(bulkProcessorBuilder);
        }

        // if backoff retrying is disabled, bulkProcessorFlushBackoffPolicy will be null
        configureBulkProcessorBackoff(bulkProcessorBuilder, bulkProcessorFlushBackoffPolicy);

        return bulkProcessorBuilder.build();
    }