in src/Consumer.cc [192:248]
Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient) {
auto deferred = ThreadSafeDeferred::New(info.Env());
auto config = info[0].As<Napi::Object>();
std::shared_ptr<ConsumerConfig> consumerConfig = std::make_shared<ConsumerConfig>(config, &MessageListener);
const std::string &topic = consumerConfig->GetTopic();
const std::vector<std::string> &topics = consumerConfig->GetTopics();
const std::string &topicsPattern = consumerConfig->GetTopicsPattern();
if (topic.empty() && topics.size() == 0 && topicsPattern.empty()) {
deferred->Reject(
std::string("Topic, topics or topicsPattern is required and must be specified as a string when "
"creating consumer"));
return deferred->Promise();
}
const std::string &subscription = consumerConfig->GetSubscription();
if (subscription.empty()) {
deferred->Reject(
std::string("Subscription is required and must be specified as a string when creating consumer"));
return deferred->Promise();
}
int32_t ackTimeoutMs = consumerConfig->GetAckTimeoutMs();
if (ackTimeoutMs != 0 && ackTimeoutMs < MIN_ACK_TIMEOUT_MILLIS) {
std::string msg("Ack timeout should be 0 or greater than or equal to " +
std::to_string(MIN_ACK_TIMEOUT_MILLIS));
deferred->Reject(msg);
return deferred->Promise();
}
int32_t nAckRedeliverTimeoutMs = consumerConfig->GetNAckRedeliverTimeoutMs();
if (nAckRedeliverTimeoutMs < 0) {
std::string msg("NAck timeout should be greater than or equal to zero");
deferred->Reject(msg);
return deferred->Promise();
}
auto ctx = new ConsumerNewInstanceContext(deferred, cClient, consumerConfig);
if (!topicsPattern.empty()) {
pulsar_client_subscribe_pattern_async(cClient.get(), topicsPattern.c_str(), subscription.c_str(),
consumerConfig->GetCConsumerConfig().get(),
&ConsumerNewInstanceContext::subscribeCallback, ctx);
} else if (topics.size() > 0) {
const char **cTopics = new const char *[topics.size()];
for (size_t i = 0; i < topics.size(); i++) {
cTopics[i] = topics[i].c_str();
}
pulsar_client_subscribe_multi_topics_async(cClient.get(), cTopics, topics.size(), subscription.c_str(),
consumerConfig->GetCConsumerConfig().get(),
&ConsumerNewInstanceContext::subscribeCallback, ctx);
delete[] cTopics;
} else {
pulsar_client_subscribe_async(cClient.get(), topic.c_str(), subscription.c_str(),
consumerConfig->GetCConsumerConfig().get(),
&ConsumerNewInstanceContext::subscribeCallback, ctx);
}
return deferred->Promise();
}