void shouldRetainMessageOrder()

in pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETests.java [99:159]


	void shouldRetainMessageOrder(MessageOrderScenario messageOrderScenario) throws Exception {
		try (PulsarClient pulsarClient = SingletonPulsarContainer.createPulsarClient()) {
			String topicName = "test" + UUID.randomUUID();
			// create subscription to retain messages
			pulsarClient.newConsumer(Schema.INT32).topic(topicName).subscriptionName("sub").subscribe().close();

			ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);

			ReactiveMessageSender<Integer> messageSender = reactivePulsarClient.messageSender(Schema.INT32)
				.topic(topicName)
				.build();

			List<MessageSpec<Integer>> messageSpecs = generateRandomOrderedMessagesWhereSingleKeyIsOrdered(
					messageOrderScenario);

			messageSender.sendMany(Flux.fromIterable(messageSpecs)).blockLast();

			ConcurrentMap<Integer, List<Integer>> messages = new ConcurrentHashMap<>();
			CountDownLatch latch = new CountDownLatch(messageSpecs.size());

			List<Integer> orderedSequence = IntStream.rangeClosed(1, ITEMS_PER_KEY_COUNT)
				.boxed()
				.collect(Collectors.toList());

			ReactiveMessagePipelineBuilder.OneByOneMessagePipelineBuilder<Integer> reactiveMessageHandlerBuilder = reactivePulsarClient
				.messageConsumer(Schema.INT32)
				.subscriptionName("sub")
				.topic(topicName)
				.build()
				.messagePipeline()
				.messageHandler((message) -> {
					Mono<Void> messageHandler = Mono.fromRunnable(() -> {
						Integer keyId = Integer.parseInt(message.getProperty("keyId"));
						messages.compute(keyId, (k, list) -> {
							if (list == null) {
								list = new ArrayList<>();
							}
							list.add(message.getValue());
							return list;
						});
						latch.countDown();
					});
					if (messageOrderScenario != MessageOrderScenario.NO_PARALLEL) {
						// add delay which would lead to the execution timeout unless
						// messages are handled in parallel
						messageHandler = Mono.delay(Duration.ofMillis(5)).then(messageHandler);
					}
					return messageHandler;
				});
			if (messageOrderScenario != MessageOrderScenario.NO_PARALLEL) {
				reactiveMessageHandlerBuilder.concurrency(KEYS_COUNT).useKeyOrderedProcessing();
			}
			try (ReactiveMessagePipeline ignored = reactiveMessageHandlerBuilder.build().start()) {
				boolean latchCompleted = latch.await(5, TimeUnit.SECONDS);
				assertThat(latchCompleted).as("processing of all messages should have completed").isTrue();
				for (int i = 1; i <= KEYS_COUNT; i++) {
					assertThat(messages.get(i)).as("keyId %d", i).containsExactlyElementsOf(orderedSequence);
				}
			}
		}
	}