in pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumer.java [77:99]
private <R> Mono<?> handleAcknowledgement(Consumer<T> consumer, MessageResult<R> messageResult,
Scheduler pinnedAcknowledgeScheduler) {
if (messageResult.getMessageId() != null) {
Mono<Void> acknowledgementMono;
if (messageResult.isAcknowledgeMessage()) {
acknowledgementMono = Mono.fromFuture(() -> consumer.acknowledgeAsync(messageResult.getMessageId()));
}
else {
acknowledgementMono = Mono
.fromRunnable(() -> consumer.negativeAcknowledge(messageResult.getMessageId()));
}
acknowledgementMono = acknowledgementMono.subscribeOn(pinnedAcknowledgeScheduler);
if (this.acknowledgeAsynchronously) {
return Mono.fromRunnable(acknowledgementMono::subscribe);
}
else {
return acknowledgementMono;
}
}
else {
return Mono.empty();
}
}