in flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch7/Elasticsearch7ApiCallBridge.java [101:126]
public void configureBulkProcessorBackoff(
BulkProcessor.Builder builder,
@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
BackoffPolicy backoffPolicy;
if (flushBackoffPolicy != null) {
switch (flushBackoffPolicy.getBackoffType()) {
case CONSTANT:
backoffPolicy =
BackoffPolicy.constantBackoff(
new TimeValue(flushBackoffPolicy.getDelayMillis()),
flushBackoffPolicy.getMaxRetryCount());
break;
case EXPONENTIAL:
default:
backoffPolicy =
BackoffPolicy.exponentialBackoff(
new TimeValue(flushBackoffPolicy.getDelayMillis()),
flushBackoffPolicy.getMaxRetryCount());
}
} else {
backoffPolicy = BackoffPolicy.noBackoff();
}
builder.setBackoffPolicy(backoffPolicy);
}