void ReaderListener()

in src/Reader.cc [81:97]


void ReaderListener(pulsar_reader_t *rawReader, pulsar_message_t *rawMessage, void *ctx) {
  std::shared_ptr<pulsar_message_t> cMessage(rawMessage, pulsar_message_free);
  ReaderListenerCallback *readerListenerCallback = (ReaderListenerCallback *)ctx;
  Reader *reader = (Reader *)readerListenerCallback->reader;
  if (readerListenerCallback->callback.Acquire() != napi_ok) {
    return;
  }

  std::promise<void> promise;
  std::future<void> future = promise.get_future();
  std::unique_ptr<ReaderListenerProxyData> dataPtr(
      new ReaderListenerProxyData(cMessage, reader, [&promise]() { promise.set_value(); }));
  readerListenerCallback->callback.BlockingCall(dataPtr.get(), ReaderListenerProxy);
  readerListenerCallback->callback.Release();

  future.wait();
}