in pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageConsumerE2ETests.java [43:79]
void shouldConsumeMessages() throws Exception {
try (PulsarClient pulsarClient = SingletonPulsarContainer.createPulsarClient();
ReactiveMessageSenderCache producerCache = AdaptedReactivePulsarClientFactory.createCache()) {
String topicName = "test" + UUID.randomUUID();
// create subscription to retain messages
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub").subscribe().close();
ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);
ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
.cache(producerCache)
.topic(topicName)
.build();
messageSender.sendMany(Flux.range(1, 100).map(Object::toString).map(MessageSpec::of)).blockLast();
ReactiveMessageConsumer<String> messageConsumer = reactivePulsarClient.messageConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("sub")
.build();
List<String> messages = messageConsumer
.consumeMany((messageFlux) -> messageFlux
.map((message) -> MessageResult.acknowledge(message.getMessageId(), message.getValue())))
.take(Duration.ofSeconds(2))
.collectList()
.block();
assertThat(messages).isEqualTo(Flux.range(1, 100).map(Object::toString).collectList().block());
// should have acknowledged all messages
List<Message<String>> remainingMessages = messageConsumer
.consumeMany((messageFlux) -> messageFlux.map(MessageResult::acknowledgeAndReturn))
.take(Duration.ofSeconds(2))
.collectList()
.block();
assertThat(remainingMessages).isEmpty();
}
}