in src/Consumer.cc [106:124]
void MessageListener(pulsar_consumer_t *rawConsumer, pulsar_message_t *rawMessage, void *ctx) {
std::shared_ptr<pulsar_message_t> cMessage(rawMessage, pulsar_message_free);
MessageListenerCallback *listenerCallback = (MessageListenerCallback *)ctx;
Consumer *consumer = static_cast<Consumer *>(listenerCallback->consumerFuture.get());
if (listenerCallback->callback.Acquire() != napi_ok) {
return;
}
std::promise<void> promise;
std::future<void> future = promise.get_future();
std::unique_ptr<MessageListenerProxyData> dataPtr(
new MessageListenerProxyData(cMessage, consumer, [&promise]() { promise.set_value(); }));
listenerCallback->callback.BlockingCall(dataPtr.get(), MessageListenerProxy);
listenerCallback->callback.Release();
future.wait();
}