in pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java [231:265]
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
org.apache.pulsar.client.api.Producer<byte[]> producer;
try {
producer = producers.computeIfAbsent(record.topic(), topic -> createNewProducer(topic));
} catch (Exception e) {
if (callback != null) {
callback.onCompletion(null, e);
}
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
TypedMessageBuilder<byte[]> messageBuilder = producer.newMessage();
int messageSize = buildMessage(messageBuilder, record);
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
messageBuilder.sendAsync().thenAccept((messageId) -> {
future.complete(getRecordMetadata(record.topic(), messageBuilder, messageId, messageSize));
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
future.handle((recordMetadata, throwable) -> {
if (callback != null) {
Exception exception = throwable != null ? new Exception(throwable) : null;
callback.onCompletion(recordMetadata, exception);
}
return null;
});
return future;
}