in pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java [93:147]
void shouldRetainMessageOrder(MessageOrderScenario messageOrderScenario) throws Exception {
try (PulsarClient pulsarClient = SingletonPulsarContainer.createPulsarClient()) {
String topicName = "test" + UUID.randomUUID();
// create subscription to retain messages
pulsarClient.newConsumer(Schema.INT32).topic(topicName).subscriptionName("sub").subscribe().close();
ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);
ReactiveMessageSender<Integer> messageSender = reactivePulsarClient.messageSender(Schema.INT32)
.topic(topicName).build();
List<MessageSpec<Integer>> messageSpecs = generateRandomOrderedMessagesWhereSingleKeyIsOrdered(
messageOrderScenario);
messageSender.sendMany(Flux.fromIterable(messageSpecs)).blockLast();
ConcurrentMap<Integer, List<Integer>> messages = new ConcurrentHashMap<>();
CountDownLatch latch = new CountDownLatch(messageSpecs.size());
List<Integer> orderedSequence = IntStream.rangeClosed(1, ITEMS_PER_KEY_COUNT).boxed()
.collect(Collectors.toList());
ReactiveMessagePipelineBuilder.OneByOneMessagePipelineBuilder<Integer> reactiveMessageHandlerBuilder = reactivePulsarClient
.messageConsumer(Schema.INT32).subscriptionName("sub").topic(topicName).build().messagePipeline()
.messageHandler((message) -> {
Mono<Void> messageHandler = Mono.fromRunnable(() -> {
Integer keyId = Integer.parseInt(message.getProperty("keyId"));
messages.compute(keyId, (k, list) -> {
if (list == null) {
list = new ArrayList<>();
}
list.add(message.getValue());
return list;
});
latch.countDown();
});
if (messageOrderScenario != MessageOrderScenario.NO_PARALLEL) {
// add delay which would lead to the execution timeout unless
// messages are handled in parallel
messageHandler = Mono.delay(Duration.ofMillis(5)).then(messageHandler);
}
return messageHandler;
});
if (messageOrderScenario != MessageOrderScenario.NO_PARALLEL) {
reactiveMessageHandlerBuilder.concurrency(KEYS_COUNT).useKeyOrderedProcessing();
}
try (ReactiveMessagePipeline ignored = reactiveMessageHandlerBuilder.build().start()) {
boolean latchCompleted = latch.await(5, TimeUnit.SECONDS);
assertThat(latchCompleted).as("processing of all messages should have completed").isTrue();
for (int i = 1; i <= KEYS_COUNT; i++) {
assertThat(messages.get(i)).as("keyId %d", i).containsExactlyElementsOf(orderedSequence);
}
}
}
}