Napi::Value Consumer::Receive()

in src/Consumer.cc [295:325]


Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
  if (info[0].IsUndefined()) {
    auto deferred = ThreadSafeDeferred::New(Env());
    auto ctx = new ExtDeferredContext(deferred);
    pulsar_consumer_receive_async(
        this->cConsumer.get(),
        [](pulsar_result result, pulsar_message_t *rawMessage, void *ctx) {
          auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
          auto deferred = deferredContext->deferred;
          delete deferredContext;

          if (result != pulsar_result_Ok) {
            deferred->Reject(std::string("Failed to receive message: ") + pulsar_result_str(result));
          } else {
            deferred->Resolve([rawMessage](const Napi::Env env) {
              Napi::Object obj = Message::NewInstance(
                  {}, std::shared_ptr<pulsar_message_t>(rawMessage, pulsar_message_free));
              return obj;
            });
          }
        },
        ctx);
    return deferred->Promise();
  } else {
    Napi::Promise::Deferred deferred = Napi::Promise::Deferred::New(info.Env());
    Napi::Number timeout = info[0].As<Napi::Object>().ToNumber();
    ConsumerReceiveWorker *wk = new ConsumerReceiveWorker(deferred, this->cConsumer, timeout.Int64Value());
    wk->Queue();
    return deferred.Promise();
  }
}