in src/Consumer.cc [262:293]
Napi::Value Consumer::BatchReceive(const Napi::CallbackInfo &info) {
auto deferred = ThreadSafeDeferred::New(Env());
auto ctx = new ExtDeferredContext(deferred);
pulsar_consumer_batch_receive_async(
this->cConsumer.get(),
[](pulsar_result result, pulsar_messages_t *rawMessages, void *ctx) {
auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
auto deferred = deferredContext->deferred;
delete deferredContext;
if (result != pulsar_result_Ok) {
deferred->Reject(std::string("Failed to batch receive message: ") + pulsar_result_str(result));
} else {
deferred->Resolve([rawMessages](const Napi::Env env) {
int listSize = pulsar_messages_size(rawMessages);
Napi::Array jsArray = Napi::Array::New(env, listSize);
for (int i = 0; i < listSize; i++) {
pulsar_message_t *rawMessage = pulsar_messages_get(rawMessages, i);
pulsar_message_t *message = pulsar_message_create();
pulsar_message_copy(rawMessage, message);
Napi::Object obj =
Message::NewInstance({}, std::shared_ptr<pulsar_message_t>(message, pulsar_message_free));
jsArray.Set(i, obj);
}
pulsar_messages_free(rawMessages);
return jsArray;
});
}
},
ctx);
return deferred->Promise();
}