in flink-connector-opensearch/src/main/java/org/apache/flink/streaming/connectors/opensearch/OpensearchSink.java [459:484]
private static void configureBulkProcessorBackoff(
BulkProcessor.Builder builder,
@Nullable OpensearchSink.BulkFlushBackoffPolicy flushBackoffPolicy) {
BackoffPolicy backoffPolicy;
if (flushBackoffPolicy != null) {
switch (flushBackoffPolicy.getBackoffType()) {
case CONSTANT:
backoffPolicy =
BackoffPolicy.constantBackoff(
new TimeValue(flushBackoffPolicy.getDelayMillis()),
flushBackoffPolicy.getMaxRetryCount());
break;
case EXPONENTIAL:
default:
backoffPolicy =
BackoffPolicy.exponentialBackoff(
new TimeValue(flushBackoffPolicy.getDelayMillis()),
flushBackoffPolicy.getMaxRetryCount());
}
} else {
backoffPolicy = BackoffPolicy.noBackoff();
}
builder.setBackoffPolicy(backoffPolicy);
}