protected Callable wrapCallAsyncCallable()

in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/StoppableKafkaEnumContextProxy.java [227:247]


    protected <T> Callable<T> wrapCallAsyncCallable(Callable<T> callable) {
        return () -> {
            try {
                return callable.call();
            } catch (Exception e) {
                if (isClosing) {
                    throw new HandledFlinkKafkaException(e, kafkaClusterId);
                }

                Optional<KafkaException> throwable =
                        ExceptionUtils.findThrowable(e, KafkaException.class);
                // check if Kafka related and if Kafka cluster is inactive
                if (throwable.isPresent()
                        && !kafkaMetadataService.isClusterActive(kafkaClusterId)) {
                    throw new HandledFlinkKafkaException(throwable.get(), kafkaClusterId);
                }

                throw e;
            }
        };
    }