Napi::Value Consumer::BatchReceive()

in src/Consumer.cc [262:293]


Napi::Value Consumer::BatchReceive(const Napi::CallbackInfo &info) {
  auto deferred = ThreadSafeDeferred::New(Env());
  auto ctx = new ExtDeferredContext(deferred);
  pulsar_consumer_batch_receive_async(
      this->cConsumer.get(),
      [](pulsar_result result, pulsar_messages_t *rawMessages, void *ctx) {
        auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
        auto deferred = deferredContext->deferred;
        delete deferredContext;

        if (result != pulsar_result_Ok) {
          deferred->Reject(std::string("Failed to batch receive message: ") + pulsar_result_str(result));
        } else {
          deferred->Resolve([rawMessages](const Napi::Env env) {
            int listSize = pulsar_messages_size(rawMessages);
            Napi::Array jsArray = Napi::Array::New(env, listSize);
            for (int i = 0; i < listSize; i++) {
              pulsar_message_t *rawMessage = pulsar_messages_get(rawMessages, i);
              pulsar_message_t *message = pulsar_message_create();
              pulsar_message_copy(rawMessage, message);
              Napi::Object obj =
                  Message::NewInstance({}, std::shared_ptr<pulsar_message_t>(message, pulsar_message_free));
              jsArray.Set(i, obj);
            }
            pulsar_messages_free(rawMessages);
            return jsArray;
          });
        }
      },
      ctx);
  return deferred->Promise();
}