Napi::Value Producer::NewInstance()

in src/Producer.cc [56:94]


Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient) {
  auto deferred = ThreadSafeDeferred::New(info.Env());
  auto config = info[0].As<Napi::Object>();
  auto producerConfig = std::make_shared<ProducerConfig>(config);

  const std::string &topic = producerConfig->GetTopic();
  if (topic.empty()) {
    deferred->Reject(
        std::string("Topic is required and must be specified as a string when creating producer"));
    return deferred->Promise();
  }

  auto ctx = new ProducerNewInstanceContext(deferred, cClient, producerConfig);

  pulsar_client_create_producer_async(
      cClient.get(), topic.c_str(), producerConfig->GetCProducerConfig().get(),
      [](pulsar_result result, pulsar_producer_t *rawProducer, void *ctx) {
        auto instanceContext = static_cast<ProducerNewInstanceContext *>(ctx);
        auto deferred = instanceContext->deferred;
        auto cClient = instanceContext->cClient;
        delete instanceContext;

        if (result != pulsar_result_Ok) {
          return deferred->Reject(std::string("Failed to create producer: ") + pulsar_result_str(result));
        }

        std::shared_ptr<pulsar_producer_t> cProducer(rawProducer, pulsar_producer_free);

        deferred->Resolve([cProducer](const Napi::Env env) {
          Napi::Object obj = Producer::constructor.New({});
          Producer *producer = Producer::Unwrap(obj);
          producer->SetCProducer(cProducer);
          return obj;
        });
      },
      ctx);

  return deferred->Promise();
}