in src/Consumer.cc [179:216]
Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient) {
auto deferred = ThreadSafeDeferred::New(info.Env());
auto config = info[0].As<Napi::Object>();
std::shared_ptr<ConsumerConfig> consumerConfig = std::make_shared<ConsumerConfig>();
consumerConfig->InitConfig(deferred, config, &MessageListener);
if (deferred->IsDone()) {
return deferred->Promise();
}
const std::string &topic = consumerConfig->GetTopic();
const std::vector<std::string> &topics = consumerConfig->GetTopics();
const std::string &topicsPattern = consumerConfig->GetTopicsPattern();
const std::string &subscription = consumerConfig->GetSubscription();
auto ctx = new ConsumerNewInstanceContext(deferred, cClient, consumerConfig);
if (!topicsPattern.empty()) {
pulsar_client_subscribe_pattern_async(cClient.get(), topicsPattern.c_str(), subscription.c_str(),
consumerConfig->GetCConsumerConfig().get(),
&ConsumerNewInstanceContext::subscribeCallback, ctx);
} else if (topics.size() > 0) {
const char **cTopics = new const char *[topics.size()];
for (size_t i = 0; i < topics.size(); i++) {
cTopics[i] = topics[i].c_str();
}
pulsar_client_subscribe_multi_topics_async(cClient.get(), cTopics, topics.size(), subscription.c_str(),
consumerConfig->GetCConsumerConfig().get(),
&ConsumerNewInstanceContext::subscribeCallback, ctx);
delete[] cTopics;
} else {
pulsar_client_subscribe_async(cClient.get(), topic.c_str(), subscription.c_str(),
consumerConfig->GetCConsumerConfig().get(),
&ConsumerNewInstanceContext::subscribeCallback, ctx);
}
return deferred->Promise();
}