flink-connector-elasticsearch6/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch6SinkBuilder.java [63:133]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            ElasticsearchEmitter<? super T> emitter) {
        super.<T>setEmitter(emitter);
        return self();
    }

    @Override
    protected BulkProcessorBuilderFactory getBulkProcessorBuilderFactory() {
        return new BulkProcessorBuilderFactory() {
            @Override
            public BulkProcessor.Builder apply(
                    RestHighLevelClient client,
                    BulkProcessorConfig bulkProcessorConfig,
                    BulkProcessor.Listener listener) {

                BulkProcessor.Builder builder =
                        BulkProcessor.builder(
                                new BulkRequestConsumerFactory() { // This cannot be inlined as a
                                    // lambda because then
                                    // deserialization fails
                                    @Override
                                    public void accept(
                                            BulkRequest bulkRequest,
                                            ActionListener<BulkResponse>
                                                    bulkResponseActionListener) {
                                        client.bulkAsync(
                                                bulkRequest,
                                                RequestOptions.DEFAULT,
                                                bulkResponseActionListener);
                                    }
                                },
                                listener);

                if (bulkProcessorConfig.getBulkFlushMaxActions() != -1) {
                    builder.setBulkActions(bulkProcessorConfig.getBulkFlushMaxActions());
                }

                if (bulkProcessorConfig.getBulkFlushMaxMb() != -1) {
                    builder.setBulkSize(
                            new ByteSizeValue(
                                    bulkProcessorConfig.getBulkFlushMaxMb(), ByteSizeUnit.MB));
                }

                if (bulkProcessorConfig.getBulkFlushInterval() != -1) {
                    builder.setFlushInterval(
                            new TimeValue(bulkProcessorConfig.getBulkFlushInterval()));
                }

                BackoffPolicy backoffPolicy;
                final TimeValue backoffDelay =
                        new TimeValue(bulkProcessorConfig.getBulkFlushBackOffDelay());
                final int maxRetryCount = bulkProcessorConfig.getBulkFlushBackoffRetries();
                switch (bulkProcessorConfig.getFlushBackoffType()) {
                    case CONSTANT:
                        backoffPolicy = BackoffPolicy.constantBackoff(backoffDelay, maxRetryCount);
                        break;
                    case EXPONENTIAL:
                        backoffPolicy =
                                BackoffPolicy.exponentialBackoff(backoffDelay, maxRetryCount);
                        break;
                    case NONE:
                        backoffPolicy = BackoffPolicy.noBackoff();
                        break;
                    default:
                        throw new IllegalArgumentException(
                                "Received unknown backoff policy type "
                                        + bulkProcessorConfig.getFlushBackoffType());
                }
                builder.setBackoffPolicy(backoffPolicy);
                return builder;
            }
        };
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch7SinkBuilder.java [62:131]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            ElasticsearchEmitter<? super T> emitter) {
        super.<T>setEmitter(emitter);
        return self();
    }

    @Override
    protected BulkProcessorBuilderFactory getBulkProcessorBuilderFactory() {
        return new BulkProcessorBuilderFactory() {
            @Override
            public BulkProcessor.Builder apply(
                    RestHighLevelClient client,
                    BulkProcessorConfig bulkProcessorConfig,
                    BulkProcessor.Listener listener) {
                BulkProcessor.Builder builder =
                        BulkProcessor.builder(
                                new BulkRequestConsumerFactory() { // This cannot be inlined as a
                                    // lambda because then
                                    // deserialization fails
                                    @Override
                                    public void accept(
                                            BulkRequest bulkRequest,
                                            ActionListener<BulkResponse>
                                                    bulkResponseActionListener) {
                                        client.bulkAsync(
                                                bulkRequest,
                                                RequestOptions.DEFAULT,
                                                bulkResponseActionListener);
                                    }
                                },
                                listener);

                if (bulkProcessorConfig.getBulkFlushMaxActions() != -1) {
                    builder.setBulkActions(bulkProcessorConfig.getBulkFlushMaxActions());
                }

                if (bulkProcessorConfig.getBulkFlushMaxMb() != -1) {
                    builder.setBulkSize(
                            new ByteSizeValue(
                                    bulkProcessorConfig.getBulkFlushMaxMb(), ByteSizeUnit.MB));
                }

                if (bulkProcessorConfig.getBulkFlushInterval() != -1) {
                    builder.setFlushInterval(
                            new TimeValue(bulkProcessorConfig.getBulkFlushInterval()));
                }

                BackoffPolicy backoffPolicy;
                final TimeValue backoffDelay =
                        new TimeValue(bulkProcessorConfig.getBulkFlushBackOffDelay());
                final int maxRetryCount = bulkProcessorConfig.getBulkFlushBackoffRetries();
                switch (bulkProcessorConfig.getFlushBackoffType()) {
                    case CONSTANT:
                        backoffPolicy = BackoffPolicy.constantBackoff(backoffDelay, maxRetryCount);
                        break;
                    case EXPONENTIAL:
                        backoffPolicy =
                                BackoffPolicy.exponentialBackoff(backoffDelay, maxRetryCount);
                        break;
                    case NONE:
                        backoffPolicy = BackoffPolicy.noBackoff();
                        break;
                    default:
                        throw new IllegalArgumentException(
                                "Received unknown backoff policy type "
                                        + bulkProcessorConfig.getFlushBackoffType());
                }
                builder.setBackoffPolicy(backoffPolicy);
                return builder;
            }
        };
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



