void RocketMQPushConsumer::HandleMessageInEventLoop()

in src/push_consumer.cpp [325:361]


void RocketMQPushConsumer::HandleMessageInEventLoop(uv_async_t* async)
{
    Nan::HandleScope scope;

    Isolate* isolate = Isolate::GetCurrent();
    Local<Context> context = isolate->GetCurrentContext();

    MessageHandlerParam* param = (MessageHandlerParam*)(async->data);
    RocketMQPushConsumer* consumer = param->consumer;
    ConsumerAckInner* ack_inner = param->ack;
    CMessageExt* msg = param->msg;

    // create the JavaScript ack object and then set inner ack object
    Local<Function> cons = Nan::New<Function>(ConsumerAck::GetConstructor());
    Local<Object> ack_obj = cons->NewInstance(context, 0, 0).ToLocalChecked();
    ConsumerAck* ack = ObjectWrap::Unwrap<ConsumerAck>(ack_obj);
    ack->SetInner(ack_inner);

    // TODO: const char *GetMessageProperty(CMessageExt *msgExt, const char *key);
    Local<Object> result = Nan::New<Object>();
    for(int i = 0; i < 5; i++)
    {
        Nan::Set(
            result,
            Nan::New(message_handler_param_keys[i]).ToLocalChecked(),
            Nan::New(RocketMQPushConsumer::GetMessageColumn(message_handler_param_keys[i], msg)).ToLocalChecked());
    }

    Local<Value> argv[2] = {
        result,
        ack_obj
    };
    Nan::Callback* callback = consumer->GetListenerFunction();
    callback->Call(2, argv);

    uv_close((uv_handle_t*)async, close_async_done);
}