in src/Producer.cc [56:94]
Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient) {
auto deferred = ThreadSafeDeferred::New(info.Env());
auto config = info[0].As<Napi::Object>();
auto producerConfig = std::make_shared<ProducerConfig>(config);
const std::string &topic = producerConfig->GetTopic();
if (topic.empty()) {
deferred->Reject(
std::string("Topic is required and must be specified as a string when creating producer"));
return deferred->Promise();
}
auto ctx = new ProducerNewInstanceContext(deferred, cClient, producerConfig);
pulsar_client_create_producer_async(
cClient.get(), topic.c_str(), producerConfig->GetCProducerConfig().get(),
[](pulsar_result result, pulsar_producer_t *rawProducer, void *ctx) {
auto instanceContext = static_cast<ProducerNewInstanceContext *>(ctx);
auto deferred = instanceContext->deferred;
auto cClient = instanceContext->cClient;
delete instanceContext;
if (result != pulsar_result_Ok) {
return deferred->Reject(std::string("Failed to create producer: ") + pulsar_result_str(result));
}
std::shared_ptr<pulsar_producer_t> cProducer(rawProducer, pulsar_producer_free);
deferred->Resolve([cProducer](const Napi::Env env) {
Napi::Object obj = Producer::constructor.New({});
Producer *producer = Producer::Unwrap(obj);
producer->SetCProducer(cProducer);
return obj;
});
},
ctx);
return deferred->Promise();
}