in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/handler/PulsarAdminInvocationHandler.java [104:130]
private Object doInvoke(Method method, Object[] args, int remainingTimes) throws Throwable {
while (true) {
// Make sure the request is allowed in the given rates.
requestRateLimit(requestRates);
try {
return method.invoke(handler, args);
} catch (InvocationTargetException e) {
Throwable throwable = e.getTargetException();
if (throwable instanceof NotFoundException) {
// No need to retry on such exceptions.
throw throwable;
} else if (throwable instanceof PulsarAdminException) {
remainingTimes--;
LOG.warn("Request error in Admin API, remain times: {}", remainingTimes, e);
if (remainingTimes == 0) {
throw throwable;
} else {
// Sleep for the given times before executing the next query.
sleepUninterruptibly(waitMillis, MILLISECONDS);
}
} else {
throw throwable;
}
}
}
}