void shouldSendMessageToTopicWithCachedProducer()

in pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETest.java [84:104]


	void shouldSendMessageToTopicWithCachedProducer(String name, ReactiveMessageSenderCache cacheInstance)
			throws Exception {
		try (PulsarClient pulsarClient = SingletonPulsarContainer.createPulsarClient();
				ReactiveMessageSenderCache producerCache = cacheInstance) {
			String topicName = "test" + UUID.randomUUID();
			try (Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
					.subscriptionName("sub").subscribe()) {

				ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);

				ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
						.cache(producerCache).maxInflight(1).topic(topicName).build();
				MessageId messageId = messageSender.sendOne(MessageSpec.of("Hello world!")).block();
				assertThat(messageId).isNotNull();

				Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
				assertThat(message).isNotNull();
				assertThat(message.getValue()).isEqualTo("Hello world!");
			}
		}
	}