in statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/nettyclient/NettyRequest.java [176:203]
private void onAttemptCompletedExceptionally(Throwable cause) throws Throwable {
metrics.remoteInvocationFailures();
LOG.warn(
"Exception caught while trying to deliver a message: (attempt #"
+ (numberOfAttempts - 1)
+ ")"
+ reqSummary,
cause);
if (client.isShutdown()) {
throw ShutdownException.INSTANCE;
}
final long delayUntilNextAttempt = delayUntilNextAttempt();
if (delayUntilNextAttempt < 0) {
throw RequestTimeoutException.INSTANCE;
}
analyzeCausalChain(cause);
LOG.info(
"Retry #"
+ numberOfAttempts
+ " "
+ reqSummary
+ " ,About to sleep for "
+ TimeUnit.NANOSECONDS.toMillis(delayUntilNextAttempt));
// better luck next time!
Preconditions.checkState(retryTask == null);
this.retryTask = client.newTimeout(this::onAttemptBackoffTimer, delayUntilNextAttempt);
}