void MessageListener()

in src/Consumer.cc [109:127]


void MessageListener(pulsar_consumer_t *rawConsumer, pulsar_message_t *rawMessage, void *ctx) {
  std::shared_ptr<pulsar_message_t> cMessage(rawMessage, pulsar_message_free);
  MessageListenerCallback *listenerCallback = (MessageListenerCallback *)ctx;

  Consumer *consumer = (Consumer *)listenerCallback->consumer;

  if (listenerCallback->callback.Acquire() != napi_ok) {
    return;
  }

  std::promise<void> promise;
  std::future<void> future = promise.get_future();
  std::unique_ptr<MessageListenerProxyData> dataPtr(
      new MessageListenerProxyData(cMessage, consumer, [&promise]() { promise.set_value(); }));
  listenerCallback->callback.BlockingCall(dataPtr.get(), MessageListenerProxy);
  listenerCallback->callback.Release();

  future.wait();
}