in flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java [97:122]
public void configureBulkProcessorBackoff(
BulkProcessor.Builder builder,
@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
BackoffPolicy backoffPolicy;
if (flushBackoffPolicy != null) {
switch (flushBackoffPolicy.getBackoffType()) {
case CONSTANT:
backoffPolicy =
BackoffPolicy.constantBackoff(
new TimeValue(flushBackoffPolicy.getDelayMillis()),
flushBackoffPolicy.getMaxRetryCount());
break;
case EXPONENTIAL:
default:
backoffPolicy =
BackoffPolicy.exponentialBackoff(
new TimeValue(flushBackoffPolicy.getDelayMillis()),
flushBackoffPolicy.getMaxRetryCount());
}
} else {
backoffPolicy = BackoffPolicy.noBackoff();
}
builder.setBackoffPolicy(backoffPolicy);
}