Mono recreateIfClosed()

in pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ProducerCacheEntry.java [79:99]


	<T> Mono<ProducerCacheEntry> recreateIfClosed(Mono<Producer<T>> producerMono) {
		return Mono.defer(() -> {
			Producer<?> p = this.producer.get();
			if (p != null) {
				if (p.isConnected()) {
					return Mono.just(this);
				}
				else {
					Mono<? extends Producer<?>> previousUpdater = this.producerCreator.get();
					if (this.producerCreator.compareAndSet(previousUpdater, createCachedProducerMono(producerMono))) {
						this.producer.compareAndSet(p, null);
						flushAndCloseProducerAsync(p);
					}
				}
			}
			return Mono.defer(this.producerCreator::get)
				.filter(Producer::isConnected)
				.repeatWhenEmpty(5, (flux) -> flux.delayElements(Duration.ofSeconds(1)))
				.thenReturn(this);
		});
	}