int RocketMQPushConsumer::OnMessage()

in src/push_consumer.cpp [363:392]


int RocketMQPushConsumer::OnMessage(CPushConsumer* consumer_ptr, CMessageExt* msg_ext)
{
    RocketMQPushConsumer* consumer = _push_consumer_map[consumer_ptr];
    if (!consumer)
    {
        // TODO: error handle
        return CConsumeStatus::E_RECONSUME_LATER;
    }

    ConsumerAckInner ack_inner;

    // create async parameter
    MessageHandlerParam param;
    param.consumer = consumer;
    param.ack = &ack_inner;
    param.msg = msg_ext;

    // create a new async handler and bind with `RocketMQPushConsumer::HandleMessageInEventLoop`
    uv_async_t* async = (uv_async_t*)malloc(sizeof(uv_async_t));
    uv_async_init(uv_default_loop(), async, RocketMQPushConsumer::HandleMessageInEventLoop);
    async->data = (void*)(&param);

    // send async handler
    uv_async_send(async);

    // wait for result
    CConsumeStatus status = ack_inner.WaitResult();

    return status;
}