protected BulkProcessor buildBulkProcessor()

in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java [363:389]


    protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
        checkNotNull(listener);

        BulkProcessor.Builder bulkProcessorBuilder =
                callBridge.createBulkProcessorBuilder(client, listener);

        // This makes flush() blocking
        bulkProcessorBuilder.setConcurrentRequests(0);

        if (bulkProcessorFlushMaxActions != null) {
            bulkProcessorBuilder.setBulkActions(bulkProcessorFlushMaxActions);
        }

        if (bulkProcessorFlushMaxSizeMb != null) {
            configureBulkSize(bulkProcessorBuilder);
        }

        if (bulkProcessorFlushIntervalMillis != null) {
            configureFlushInterval(bulkProcessorBuilder);
        }

        // if backoff retrying is disabled, bulkProcessorFlushBackoffPolicy will be null
        callBridge.configureBulkProcessorBackoff(
                bulkProcessorBuilder, bulkProcessorFlushBackoffPolicy);

        return bulkProcessorBuilder.build();
    }