in flux/src/main/java/software/amazon/aws/clients/swf/flux/util/RetryUtils.java [56:140]
public static <T> T executeWithInlineBackoff(Supplier<T> function, long maxAttempts, Duration maxRetryDelay,
MetricRecorder metrics, String operationPrefix) {
String timeMetricName = operationPrefix + "Time";
String callCountMetricName = operationPrefix + "CallCount";
String failureCountMetricName = operationPrefix + "Error";
String throttleCountMetricName = operationPrefix + "Throttled";
String retryableClientExceptionCountMetricName = operationPrefix + "RetryableClientException";
String gaveUpRetryingThrottlingMetricName = operationPrefix + "ThrottledTooManyTimes";
String gaveUpRetryingClientExceptionMetricName = operationPrefix + "RetriedClientExceptionTooManyTimes";
int retry = 0;
while (true) {
try {
metrics.addCount(callCountMetricName, 1);
metrics.startDuration(timeMetricName);
final T result = function.get();
metrics.endDuration(timeMetricName);
metrics.addCount(throttleCountMetricName, 0);
metrics.addCount(retryableClientExceptionCountMetricName, 0);
metrics.addCount(gaveUpRetryingThrottlingMetricName, 0);
metrics.addCount(gaveUpRetryingClientExceptionMetricName, 0);
metrics.addCount(failureCountMetricName, 0);
log.debug("Succeeded at executing the request after {} retries.", retry);
return result;
} catch (SdkServiceException e) {
// make sure the time duration gets closed out before we sleep/retry
metrics.endDuration(timeMetricName);
if (!e.isThrottlingException()) {
// SWF doesn't (always?) use status code 429 for throttling, so we have to check the message too :(
String message = e.getMessage();
if (message == null || !message.toLowerCase().contains(THROTTLING_MESSAGE)) {
metrics.addCount(failureCountMetricName, 1);
throw e;
}
}
metrics.addCount(throttleCountMetricName, 1);
retry++;
if (retry >= maxAttempts) {
log.debug("Still throttled after {} attempts. Giving up on retrying with this thread.", maxAttempts);
metrics.addCount(gaveUpRetryingThrottlingMetricName, 1);
throw e;
}
// we'll always start backing off after the second retry, starting at 10 milliseconds delay.
// then we'll back off up to maxRetryDelay per retry, with 10% jitter.
// we'll treat this as milliseconds and sleep for this long.
long sleepTimeMillis = calculateRetryBackoff(1, 100, maxRetryDelay.toMillis(), retry, 10,
FluxCapacitorImpl.DEFAULT_EXPONENTIAL_BACKOFF_BASE);
log.debug("Throttled by SWF, retrying after {}ms.", sleepTimeMillis);
try {
Thread.sleep(sleepTimeMillis);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
} catch (SdkClientException e) {
Throwable retryableInnerException = getInnerRetryableClientException(e);
if (retryableInnerException == null) {
throw e;
}
metrics.endDuration(timeMetricName);
metrics.addCount(retryableClientExceptionCountMetricName, 1);
retry++;
if (retry >= maxAttempts) {
log.debug("Still has retryable client exception after {} attempts."
+ " Giving up on retrying with this thread.", maxAttempts);
metrics.addCount(gaveUpRetryingClientExceptionMetricName, 1);
throw e;
}
// we'll always start backing off after the second retry, starting at 10 milliseconds delay.
// then we'll back off up to maxRetryDelay per retry, with 10% jitter.
// we'll treat this as milliseconds and sleep for this long.
long sleepTimeMillis = calculateRetryBackoff(1, 100, maxRetryDelay.toMillis(), retry, 10,
FluxCapacitorImpl.DEFAULT_EXPONENTIAL_BACKOFF_BASE);
log.debug("Retryable client Exception, retrying after {}ms.", sleepTimeMillis);
try {
Thread.sleep(sleepTimeMillis);
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
}
}