in rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/RetryUtil.java [115:174]
public <T> T doRetry(Callable<T> callable, int retryTimes, long sleepTimeInMilliSecond, boolean exponential, List<Class<?>> retryExceptionClasss)
throws Exception {
if (null == callable) {
throw new IllegalArgumentException("Callable cannot be empty ! ");
}
if (retryTimes < 1) {
throw new IllegalArgumentException(String.format(
"Input parameter retry time [%d] cannot be less than 1 !", retryTimes));
}
Exception saveException = null;
for (int i = 0; i < retryTimes; i++) {
try {
return call(callable);
} catch (Exception e) {
saveException = e;
if (null != retryExceptionClasss && !retryExceptionClasss.isEmpty()) {
boolean needRetry = false;
for (Class<?> eachExceptionClass : retryExceptionClasss) {
if (eachExceptionClass == e.getClass()) {
needRetry = true;
break;
}
}
if (!needRetry) {
throw saveException;
}
}
if (i + 1 < retryTimes && sleepTimeInMilliSecond > 0) {
long startTime = System.currentTimeMillis();
long timeToSleep;
if (exponential) {
timeToSleep = sleepTimeInMilliSecond * (long) Math.pow(2, i);
if (timeToSleep >= MAX_SLEEP_MILLISECOND) {
timeToSleep = MAX_SLEEP_MILLISECOND;
}
} else {
timeToSleep = sleepTimeInMilliSecond;
if (timeToSleep >= MAX_SLEEP_MILLISECOND) {
timeToSleep = MAX_SLEEP_MILLISECOND;
}
}
try {
Thread.sleep(timeToSleep);
} catch (InterruptedException ignored) {
}
long realTimeSleep = System.currentTimeMillis() - startTime;
LOG.error(String.format("Exception when calling callable, You are about to try to execute the %s retry. This retry is scheduled to wait for [%s] ms, and the actual wait is [%s] ms. Exception Msg: [%s]",
i + 1, timeToSleep, realTimeSleep, e.getMessage()));
}
}
}
throw saveException;
}