in src/push_consumer.cpp [363:392]
int RocketMQPushConsumer::OnMessage(CPushConsumer* consumer_ptr, CMessageExt* msg_ext)
{
RocketMQPushConsumer* consumer = _push_consumer_map[consumer_ptr];
if (!consumer)
{
// TODO: error handle
return CConsumeStatus::E_RECONSUME_LATER;
}
ConsumerAckInner ack_inner;
// create async parameter
MessageHandlerParam param;
param.consumer = consumer;
param.ack = &ack_inner;
param.msg = msg_ext;
// create a new async handler and bind with `RocketMQPushConsumer::HandleMessageInEventLoop`
uv_async_t* async = (uv_async_t*)malloc(sizeof(uv_async_t));
uv_async_init(uv_default_loop(), async, RocketMQPushConsumer::HandleMessageInEventLoop);
async->data = (void*)(¶m);
// send async handler
uv_async_send(async);
// wait for result
CConsumeStatus status = ack_inner.WaitResult();
return status;
}