void shouldRetainMessageOrder()

in pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETest.java [93:147]


	void shouldRetainMessageOrder(MessageOrderScenario messageOrderScenario) throws Exception {
		try (PulsarClient pulsarClient = SingletonPulsarContainer.createPulsarClient()) {
			String topicName = "test" + UUID.randomUUID();
			// create subscription to retain messages
			pulsarClient.newConsumer(Schema.INT32).topic(topicName).subscriptionName("sub").subscribe().close();

			ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);

			ReactiveMessageSender<Integer> messageSender = reactivePulsarClient.messageSender(Schema.INT32)
					.topic(topicName).build();

			List<MessageSpec<Integer>> messageSpecs = generateRandomOrderedMessagesWhereSingleKeyIsOrdered(
					messageOrderScenario);

			messageSender.sendMany(Flux.fromIterable(messageSpecs)).blockLast();

			ConcurrentMap<Integer, List<Integer>> messages = new ConcurrentHashMap<>();
			CountDownLatch latch = new CountDownLatch(messageSpecs.size());

			List<Integer> orderedSequence = IntStream.rangeClosed(1, ITEMS_PER_KEY_COUNT).boxed()
					.collect(Collectors.toList());

			ReactiveMessagePipelineBuilder.OneByOneMessagePipelineBuilder<Integer> reactiveMessageHandlerBuilder = reactivePulsarClient
					.messageConsumer(Schema.INT32).subscriptionName("sub").topic(topicName).build().messagePipeline()
					.messageHandler((message) -> {
						Mono<Void> messageHandler = Mono.fromRunnable(() -> {
							Integer keyId = Integer.parseInt(message.getProperty("keyId"));
							messages.compute(keyId, (k, list) -> {
								if (list == null) {
									list = new ArrayList<>();
								}
								list.add(message.getValue());
								return list;
							});
							latch.countDown();
						});
						if (messageOrderScenario != MessageOrderScenario.NO_PARALLEL) {
							// add delay which would lead to the execution timeout unless
							// messages are handled in parallel
							messageHandler = Mono.delay(Duration.ofMillis(5)).then(messageHandler);
						}
						return messageHandler;
					});
			if (messageOrderScenario != MessageOrderScenario.NO_PARALLEL) {
				reactiveMessageHandlerBuilder.concurrency(KEYS_COUNT).useKeyOrderedProcessing();
			}
			try (ReactiveMessagePipeline ignored = reactiveMessageHandlerBuilder.build().start()) {
				boolean latchCompleted = latch.await(5, TimeUnit.SECONDS);
				assertThat(latchCompleted).as("processing of all messages should have completed").isTrue();
				for (int i = 1; i <= KEYS_COUNT; i++) {
					assertThat(messages.get(i)).as("keyId %d", i).containsExactlyElementsOf(orderedSequence);
				}
			}
		}
	}