in pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java [181:214]
public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, Callback callback) {
org.apache.pulsar.client.api.Producer<byte[]> producer;
try {
producer = producers.computeIfAbsent(producerRecord.topic(), topic -> createNewProducer(topic));
} catch (Exception e) {
if (callback != null) {
callback.onCompletion(null, e);
}
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
TypedMessageBuilder<byte[]> messageBuilder = buildMessage(producer, producerRecord);
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
messageBuilder.sendAsync().thenAccept((messageId) -> {
future.complete(getRecordMetadata(producerRecord.topic(), messageId));
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
future.handle((recordMetadata, throwable) -> {
if (callback != null) {
Exception exception = throwable != null ? new Exception(throwable) : null;
callback.onCompletion(recordMetadata, exception);
}
return null;
});
return future;
}