in flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxySyncV2.java [163:200]
private <T> T invokeWithRetryAndBackoff(
final ResponseSupplier<T> responseSupplier,
final long jitterBase,
final long jitterMax,
final double jitterExponent,
final int maximumNumberOfRetries)
throws InterruptedException, ExecutionException {
T response = null;
int attempt = 0;
while (attempt < maximumNumberOfRetries && response == null) {
try {
response = responseSupplier.get();
} catch (Exception ex) {
if (AwsV2Util.isRecoverableException(ex)) {
long backoffMillis =
backoff.calculateFullJitterBackoff(
jitterBase, jitterMax, jitterExponent, ++attempt);
LOG.warn(
"Encountered recoverable error: {}. Backing off for {} millis.",
ex.getClass().getSimpleName(),
backoffMillis,
ex);
backoff.sleep(backoffMillis);
} else {
throw ex;
}
}
}
if (response == null) {
throw new RuntimeException(
"Retries exceeded - all " + maximumNumberOfRetries + " retry attempts failed.");
}
return response;
}