Napi::Value Consumer::NewInstance()

in src/Consumer.cc [179:216]


Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient) {
  auto deferred = ThreadSafeDeferred::New(info.Env());
  auto config = info[0].As<Napi::Object>();
  std::shared_ptr<ConsumerConfig> consumerConfig = std::make_shared<ConsumerConfig>();

  consumerConfig->InitConfig(deferred, config, &MessageListener);
  if (deferred->IsDone()) {
    return deferred->Promise();
  }

  const std::string &topic = consumerConfig->GetTopic();
  const std::vector<std::string> &topics = consumerConfig->GetTopics();
  const std::string &topicsPattern = consumerConfig->GetTopicsPattern();
  const std::string &subscription = consumerConfig->GetSubscription();

  auto ctx = new ConsumerNewInstanceContext(deferred, cClient, consumerConfig);

  if (!topicsPattern.empty()) {
    pulsar_client_subscribe_pattern_async(cClient.get(), topicsPattern.c_str(), subscription.c_str(),
                                          consumerConfig->GetCConsumerConfig().get(),
                                          &ConsumerNewInstanceContext::subscribeCallback, ctx);
  } else if (topics.size() > 0) {
    const char **cTopics = new const char *[topics.size()];
    for (size_t i = 0; i < topics.size(); i++) {
      cTopics[i] = topics[i].c_str();
    }
    pulsar_client_subscribe_multi_topics_async(cClient.get(), cTopics, topics.size(), subscription.c_str(),
                                               consumerConfig->GetCConsumerConfig().get(),
                                               &ConsumerNewInstanceContext::subscribeCallback, ctx);
    delete[] cTopics;
  } else {
    pulsar_client_subscribe_async(cClient.get(), topic.c_str(), subscription.c_str(),
                                  consumerConfig->GetCConsumerConfig().get(),
                                  &ConsumerNewInstanceContext::subscribeCallback, ctx);
  }

  return deferred->Promise();
}