in pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ConcurrentHashMapProducerCacheProvider.java [50:64]
public void close() {
for (CompletableFuture<Object> future : this.cache.values()) {
future.thenAccept((value) -> {
if (value instanceof AutoCloseable) {
try {
((AutoCloseable) value).close();
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
});
this.cache.clear();
}
}