private BulkProcessor createBulkProcessor()

in flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/sink/OpensearchWriter.java [166:222]


    private BulkProcessor createBulkProcessor(BulkProcessorConfig bulkProcessorConfig) {

        final BulkProcessor.Builder builder =
                BulkProcessor.builder(
                        new BulkRequestConsumerFactory() { // This cannot be inlined as a
                            // lambda because then
                            // deserialization fails
                            @Override
                            public void accept(
                                    BulkRequest bulkRequest,
                                    ActionListener<BulkResponse> bulkResponseActionListener) {
                                client.bulkAsync(
                                        bulkRequest,
                                        RequestOptions.DEFAULT,
                                        bulkResponseActionListener);
                            }
                        },
                        new BulkListener());

        if (bulkProcessorConfig.getBulkFlushMaxActions() != -1) {
            builder.setBulkActions(bulkProcessorConfig.getBulkFlushMaxActions());
        }

        if (bulkProcessorConfig.getBulkFlushMaxMb() != -1) {
            builder.setBulkSize(
                    new ByteSizeValue(bulkProcessorConfig.getBulkFlushMaxMb(), ByteSizeUnit.MB));
        }

        if (bulkProcessorConfig.getBulkFlushInterval() != -1) {
            builder.setFlushInterval(new TimeValue(bulkProcessorConfig.getBulkFlushInterval()));
        }

        BackoffPolicy backoffPolicy;
        final TimeValue backoffDelay =
                new TimeValue(bulkProcessorConfig.getBulkFlushBackOffDelay());
        final int maxRetryCount = bulkProcessorConfig.getBulkFlushBackoffRetries();
        switch (bulkProcessorConfig.getFlushBackoffType()) {
            case CONSTANT:
                backoffPolicy = BackoffPolicy.constantBackoff(backoffDelay, maxRetryCount);
                break;
            case EXPONENTIAL:
                backoffPolicy = BackoffPolicy.exponentialBackoff(backoffDelay, maxRetryCount);
                break;
            case NONE:
                backoffPolicy = BackoffPolicy.noBackoff();
                break;
            default:
                throw new IllegalArgumentException(
                        "Received unknown backoff policy type "
                                + bulkProcessorConfig.getFlushBackoffType());
        }
        builder.setBackoffPolicy(backoffPolicy);
        // This makes flush() blocking
        builder.setConcurrentRequests(0);

        return builder.build();
    }