in pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessageSenderE2ETests.java [62:83]
void shouldSendMessageToTopic() throws PulsarClientException {
try (PulsarClient pulsarClient = SingletonPulsarContainer.createPulsarClient()) {
String topicName = "test" + UUID.randomUUID();
try (Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("sub")
.subscribe()) {
ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);
ReactiveMessageSender<String> messageSender = reactivePulsarClient.messageSender(Schema.STRING)
.topic(topicName)
.build();
MessageId messageId = messageSender.sendOne(MessageSpec.of("Hello world!")).block();
assertThat(messageId).isNotNull();
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
assertThat(message).isNotNull();
assertThat(message.getValue()).isEqualTo("Hello world!");
}
}
}