Napi::Value Consumer::NewInstance()

in src/Consumer.cc [192:248]


Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient) {
  auto deferred = ThreadSafeDeferred::New(info.Env());
  auto config = info[0].As<Napi::Object>();
  std::shared_ptr<ConsumerConfig> consumerConfig = std::make_shared<ConsumerConfig>(config, &MessageListener);

  const std::string &topic = consumerConfig->GetTopic();
  const std::vector<std::string> &topics = consumerConfig->GetTopics();
  const std::string &topicsPattern = consumerConfig->GetTopicsPattern();
  if (topic.empty() && topics.size() == 0 && topicsPattern.empty()) {
    deferred->Reject(
        std::string("Topic, topics or topicsPattern is required and must be specified as a string when "
                    "creating consumer"));
    return deferred->Promise();
  }
  const std::string &subscription = consumerConfig->GetSubscription();
  if (subscription.empty()) {
    deferred->Reject(
        std::string("Subscription is required and must be specified as a string when creating consumer"));
    return deferred->Promise();
  }
  int32_t ackTimeoutMs = consumerConfig->GetAckTimeoutMs();
  if (ackTimeoutMs != 0 && ackTimeoutMs < MIN_ACK_TIMEOUT_MILLIS) {
    std::string msg("Ack timeout should be 0 or greater than or equal to " +
                    std::to_string(MIN_ACK_TIMEOUT_MILLIS));
    deferred->Reject(msg);
    return deferred->Promise();
  }
  int32_t nAckRedeliverTimeoutMs = consumerConfig->GetNAckRedeliverTimeoutMs();
  if (nAckRedeliverTimeoutMs < 0) {
    std::string msg("NAck timeout should be greater than or equal to zero");
    deferred->Reject(msg);
    return deferred->Promise();
  }

  auto ctx = new ConsumerNewInstanceContext(deferred, cClient, consumerConfig);

  if (!topicsPattern.empty()) {
    pulsar_client_subscribe_pattern_async(cClient.get(), topicsPattern.c_str(), subscription.c_str(),
                                          consumerConfig->GetCConsumerConfig().get(),
                                          &ConsumerNewInstanceContext::subscribeCallback, ctx);
  } else if (topics.size() > 0) {
    const char **cTopics = new const char *[topics.size()];
    for (size_t i = 0; i < topics.size(); i++) {
      cTopics[i] = topics[i].c_str();
    }
    pulsar_client_subscribe_multi_topics_async(cClient.get(), cTopics, topics.size(), subscription.c_str(),
                                               consumerConfig->GetCConsumerConfig().get(),
                                               &ConsumerNewInstanceContext::subscribeCallback, ctx);
    delete[] cTopics;
  } else {
    pulsar_client_subscribe_async(cClient.get(), topic.c_str(), subscription.c_str(),
                                  consumerConfig->GetCConsumerConfig().get(),
                                  &ConsumerNewInstanceContext::subscribeCallback, ctx);
  }

  return deferred->Promise();
}