in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java [363:389]
protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
checkNotNull(listener);
BulkProcessor.Builder bulkProcessorBuilder =
callBridge.createBulkProcessorBuilder(client, listener);
// This makes flush() blocking
bulkProcessorBuilder.setConcurrentRequests(0);
if (bulkProcessorFlushMaxActions != null) {
bulkProcessorBuilder.setBulkActions(bulkProcessorFlushMaxActions);
}
if (bulkProcessorFlushMaxSizeMb != null) {
configureBulkSize(bulkProcessorBuilder);
}
if (bulkProcessorFlushIntervalMillis != null) {
configureFlushInterval(bulkProcessorBuilder);
}
// if backoff retrying is disabled, bulkProcessorFlushBackoffPolicy will be null
callBridge.configureBulkProcessorBackoff(
bulkProcessorBuilder, bulkProcessorFlushBackoffPolicy);
return bulkProcessorBuilder.build();
}