public void send()

in pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java [146:177]


    public void send(KeyedMessage<K, V> message) {
        org.apache.pulsar.client.api.Producer<byte[]> producer;

        try {
            producer = producers.computeIfAbsent(message.topic(), topic -> createNewProducer(topic));
        } catch (Exception e) {
            throw new IllegalArgumentException("Failed to create producer for " + message.topic(), e);
        }

        TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
        buildMessage(messageBuilder, message);

        if (isSendAsync) {
            // what if message publish fails:
            // according to : https://kafka.apache.org/08/documentation.html#producerapi
            // async: opens the possibility of a failure of the client machine dropping unsent data
            messageBuilder.sendAsync().handle((res, ex) -> {
                if (ex != null) {
                    log.warn("publish failed for {}", producer.getTopic(), ex);
                }
                return null;
            });
        } else {
            try {
                messageBuilder.send();
            } catch (PulsarClientException e) {
                log.warn("publish failed for {}", producer.getTopic(), e);
                throw new IllegalStateException("Failed to publish message " + message.topic(), e);
            }
        }

    }