in pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java [146:177]
public void send(KeyedMessage<K, V> message) {
org.apache.pulsar.client.api.Producer<byte[]> producer;
try {
producer = producers.computeIfAbsent(message.topic(), topic -> createNewProducer(topic));
} catch (Exception e) {
throw new IllegalArgumentException("Failed to create producer for " + message.topic(), e);
}
TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
buildMessage(messageBuilder, message);
if (isSendAsync) {
// what if message publish fails:
// according to : https://kafka.apache.org/08/documentation.html#producerapi
// async: opens the possibility of a failure of the client machine dropping unsent data
messageBuilder.sendAsync().handle((res, ex) -> {
if (ex != null) {
log.warn("publish failed for {}", producer.getTopic(), ex);
}
return null;
});
} else {
try {
messageBuilder.send();
} catch (PulsarClientException e) {
log.warn("publish failed for {}", producer.getTopic(), e);
throw new IllegalStateException("Failed to publish message " + message.topic(), e);
}
}
}