in src/Consumer.cc [156:189]
static void subscribeCallback(pulsar_result result, pulsar_consumer_t *rawConsumer, void *ctx) {
auto instanceContext = static_cast<ConsumerNewInstanceContext *>(ctx);
auto deferred = instanceContext->deferred;
auto cClient = instanceContext->cClient;
auto consumerConfig = instanceContext->consumerConfig;
delete instanceContext;
if (result != pulsar_result_Ok) {
return deferred->Reject(std::string("Failed to create consumer: ") + pulsar_result_str(result));
}
auto cConsumer = std::shared_ptr<pulsar_consumer_t>(rawConsumer, pulsar_consumer_free);
auto listener = consumerConfig->GetListenerCallback();
if (listener) {
// pause, will resume in OnOK, to prevent MessageListener get a nullptr of consumer
pulsar_consumer_pause_message_listener(cConsumer.get());
}
deferred->Resolve([cConsumer, consumerConfig, listener](const Napi::Env env) {
Napi::Object obj = Consumer::constructor.New({});
Consumer *consumer = Consumer::Unwrap(obj);
consumer->SetCConsumer(cConsumer);
consumer->SetListenerCallback(listener);
if (listener) {
// resume to enable MessageListener function callback
resume_message_listener(cConsumer.get());
}
return obj;
});
}