in src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java [117:142]
private ListenableFuture<Boolean> publishAsync(String topic, String messageBody) {
try {
Future<RecordMetadata> future =
producer.send(
new ProducerRecord<>(topic, Long.toString(System.nanoTime()), messageBody),
(metadata, e) -> {
if (metadata != null && e == null) {
LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
publisherMetrics.incrementBrokerPublishedMessage();
} else {
LOGGER.error("Cannot send the message", e);
publisherMetrics.incrementBrokerFailedToPublishMessage();
}
});
// The transformation is lightweight, so we can afford using a directExecutor
return Futures.transform(
JdkFutureAdapters.listenInPoolThread(future),
Objects::nonNull,
MoreExecutors.directExecutor());
} catch (Throwable e) {
LOGGER.error("Cannot send the message", e);
publisherMetrics.incrementBrokerFailedToPublishMessage();
return Futures.immediateFailedFuture(e);
}
}