in flink-connector-elasticsearch7/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch7SinkBuilder.java [68:132]
protected BulkProcessorBuilderFactory getBulkProcessorBuilderFactory() {
return new BulkProcessorBuilderFactory() {
@Override
public BulkProcessor.Builder apply(
RestHighLevelClient client,
BulkProcessorConfig bulkProcessorConfig,
BulkProcessor.Listener listener) {
BulkProcessor.Builder builder =
BulkProcessor.builder(
new BulkRequestConsumerFactory() { // This cannot be inlined as a
// lambda because then
// deserialization fails
@Override
public void accept(
BulkRequest bulkRequest,
ActionListener<BulkResponse>
bulkResponseActionListener) {
client.bulkAsync(
bulkRequest,
RequestOptions.DEFAULT,
bulkResponseActionListener);
}
},
listener);
if (bulkProcessorConfig.getBulkFlushMaxActions() != -1) {
builder.setBulkActions(bulkProcessorConfig.getBulkFlushMaxActions());
}
if (bulkProcessorConfig.getBulkFlushMaxMb() != -1) {
builder.setBulkSize(
new ByteSizeValue(
bulkProcessorConfig.getBulkFlushMaxMb(), ByteSizeUnit.MB));
}
if (bulkProcessorConfig.getBulkFlushInterval() != -1) {
builder.setFlushInterval(
new TimeValue(bulkProcessorConfig.getBulkFlushInterval()));
}
BackoffPolicy backoffPolicy;
final TimeValue backoffDelay =
new TimeValue(bulkProcessorConfig.getBulkFlushBackOffDelay());
final int maxRetryCount = bulkProcessorConfig.getBulkFlushBackoffRetries();
switch (bulkProcessorConfig.getFlushBackoffType()) {
case CONSTANT:
backoffPolicy = BackoffPolicy.constantBackoff(backoffDelay, maxRetryCount);
break;
case EXPONENTIAL:
backoffPolicy =
BackoffPolicy.exponentialBackoff(backoffDelay, maxRetryCount);
break;
case NONE:
backoffPolicy = BackoffPolicy.noBackoff();
break;
default:
throw new IllegalArgumentException(
"Received unknown backoff policy type "
+ bulkProcessorConfig.getFlushBackoffType());
}
builder.setBackoffPolicy(backoffPolicy);
return builder;
}
};
}