in src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java [100:115]
private ListenableFuture<Boolean> publishSync(String topic, String messageBody) {
SettableFuture<Boolean> resultF = SettableFuture.create();
try {
Future<RecordMetadata> future =
producer.send(new ProducerRecord<>(topic, "" + System.nanoTime(), messageBody));
RecordMetadata metadata = future.get();
LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
publisherMetrics.incrementBrokerPublishedMessage();
resultF.set(true);
return resultF;
} catch (Throwable e) {
LOGGER.error("Cannot send the message", e);
publisherMetrics.incrementBrokerFailedToPublishMessage();
return Futures.immediateFailedFuture(e);
}
}