in src/Consumer.cc [295:325]
Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
if (info[0].IsUndefined()) {
auto deferred = ThreadSafeDeferred::New(Env());
auto ctx = new ExtDeferredContext(deferred);
pulsar_consumer_receive_async(
this->cConsumer.get(),
[](pulsar_result result, pulsar_message_t *rawMessage, void *ctx) {
auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
auto deferred = deferredContext->deferred;
delete deferredContext;
if (result != pulsar_result_Ok) {
deferred->Reject(std::string("Failed to receive message: ") + pulsar_result_str(result));
} else {
deferred->Resolve([rawMessage](const Napi::Env env) {
Napi::Object obj = Message::NewInstance(
{}, std::shared_ptr<pulsar_message_t>(rawMessage, pulsar_message_free));
return obj;
});
}
},
ctx);
return deferred->Promise();
} else {
Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
Napi::Number timeout = info[0].As<Napi::Object>().ToNumber();
ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer, timeout.Int64Value());
wk->Queue();
return deferred.Promise();
}
}