in src/main/java/org/apache/flink/connector/rocketmq/legacy/common/util/RetryUtil.java [50:74]
public static <T> T call(Callable<T> callable, String errorMsg, RunningChecker runningChecker)
throws RuntimeException {
long backoff = INITIAL_BACKOFF;
int retries = 0;
do {
try {
return callable.call();
} catch (Exception ex) {
if (retries >= MAX_ATTEMPTS) {
if (null != runningChecker) {
runningChecker.setRunning(false);
}
throw new RuntimeException(ex);
}
if (DEBUG) {
log.debug("{}, retry {}/{}", errorMsg, retries, MAX_ATTEMPTS, ex);
} else {
log.error("{}, retry {}/{}", errorMsg, retries, MAX_ATTEMPTS, ex);
}
retries++;
}
waitForMs(backoff);
backoff = Math.min(backoff * 2, MAX_BACKOFF);
} while (true);
}