private ListenableFuture publishAsync()

in src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java [117:142]


  private ListenableFuture<Boolean> publishAsync(String topic, String messageBody) {
    try {
      Future<RecordMetadata> future =
          producer.send(
              new ProducerRecord<>(topic, Long.toString(System.nanoTime()), messageBody),
              (metadata, e) -> {
                if (metadata != null && e == null) {
                  LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset());
                  publisherMetrics.incrementBrokerPublishedMessage();
                } else {
                  LOGGER.error("Cannot send the message", e);
                  publisherMetrics.incrementBrokerFailedToPublishMessage();
                }
              });

      // The transformation is lightweight, so we can afford using a directExecutor
      return Futures.transform(
          JdkFutureAdapters.listenInPoolThread(future),
          Objects::nonNull,
          MoreExecutors.directExecutor());
    } catch (Throwable e) {
      LOGGER.error("Cannot send the message", e);
      publisherMetrics.incrementBrokerFailedToPublishMessage();
      return Futures.immediateFailedFuture(e);
    }
  }