in pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java [123:129]
private void closeProducer() {
Producer<?> p = this.producer.get();
if (p != null && this.producer.compareAndSet(p, null)) {
log.info("Closed producer {} for topic {}", p.getProducerName(), p.getTopic());
flushAndCloseProducerAsync(p);
}
}