private Mono handleAcknowledgement()

in pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java [79:101]


	private <R> Mono<?> handleAcknowledgement(Consumer<T> consumer, MessageResult<R> messageResult,
			Scheduler pinnedAcknowledgeScheduler) {
		if (messageResult.getMessageId() != null) {
			Mono<Void> acknowledgementMono;
			if (messageResult.isAcknowledgeMessage()) {
				acknowledgementMono = Mono.fromFuture(() -> consumer.acknowledgeAsync(messageResult.getMessageId()));
			}
			else {
				acknowledgementMono = Mono
						.fromRunnable(() -> consumer.negativeAcknowledge(messageResult.getMessageId()));
			}
			acknowledgementMono = acknowledgementMono.subscribeOn(pinnedAcknowledgeScheduler);
			if (this.acknowledgeAsynchronously) {
				return Mono.fromRunnable(acknowledgementMono::subscribe);
			}
			else {
				return acknowledgementMono;
			}
		}
		else {
			return Mono.empty();
		}
	}