in pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java [79:97]
<T> Mono<ProducerCacheEntry> recreateIfClosed(Mono<Producer<T>> producerMono) {
return Mono.defer(() -> {
Producer<?> p = this.producer.get();
if (p != null) {
if (p.isConnected()) {
return Mono.just(this);
}
else {
Mono<? extends Producer<?>> previousUpdater = this.producerCreator.get();
if (this.producerCreator.compareAndSet(previousUpdater, createCachedProducerMono(producerMono))) {
this.producer.compareAndSet(p, null);
flushAndCloseProducerAsync(p);
}
}
}
return Mono.defer(this.producerCreator::get).filter(Producer::isConnected)
.repeatWhenEmpty(5, (flux) -> flux.delayElements(Duration.ofSeconds(1))).thenReturn(this);
});
}