in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxy.java [227:247]
protected <T> Callable<T> wrapCallAsyncCallable(Callable<T> callable) {
return () -> {
try {
return callable.call();
} catch (Exception e) {
if (isClosing) {
throw new HandledFlinkKafkaException(e, kafkaClusterId);
}
Optional<KafkaException> throwable =
ExceptionUtils.findThrowable(e, KafkaException.class);
// check if Kafka related and if Kafka cluster is inactive
if (throwable.isPresent()
&& !kafkaMetadataService.isClusterActive(kafkaClusterId)) {
throw new HandledFlinkKafkaException(throwable.get(), kafkaClusterId);
}
throw e;
}
};
}