in src/consumer/ConsumeMessageHookImpl.cpp [45:89]
void ConsumeMessageHookImpl::executeHookBefore(ConsumeMessageContext* context) {
if (context == NULL || context->getMsgList().empty()) {
return;
}
TraceContext* traceContext = new TraceContext();
context->setTraceContext(traceContext);
traceContext->setTraceType(SubBefore);
traceContext->setGroupName(NameSpaceUtil::withoutNameSpace(context->getConsumerGroup(), context->getNameSpace()));
std::vector<TraceBean> beans;
std::vector<MQMessageExt> msgs = context->getMsgList();
std::vector<MQMessageExt>::iterator it = msgs.begin();
for (; it != msgs.end(); ++it) {
std::string traceOn = it->getProperty(MQMessage::PROPERTY_TRACE_SWITCH);
if (traceOn != "" && traceOn == "false") {
continue;
}
TraceBean bean;
bean.setTopic((*it).getTopic());
bean.setMsgId((*it).getMsgId());
bean.setTags((*it).getTags());
bean.setKeys((*it).getKeys());
bean.setStoreHost((*it).getStoreHostString());
bean.setStoreTime((*it).getStoreTimestamp());
bean.setBodyLength((*it).getStoreSize());
bean.setRetryTimes((*it).getReconsumeTimes());
std::string regionId = (*it).getProperty(MQMessage::PROPERTY_MSG_REGION);
if (regionId.empty()) {
regionId = TraceConstant::DEFAULT_REDION;
}
traceContext->setRegionId(regionId);
traceContext->setTraceBean(bean);
}
traceContext->setTimeStamp(UtilAll::currentTimeMillis());
std::string topic = TraceConstant::TRACE_TOPIC + traceContext->getRegionId();
TraceTransferBean ben = TraceUtil::CovertTraceContextToTransferBean(traceContext);
MQMessage message(topic, ben.getTransData());
message.setKeys(ben.getTransKey());
// send trace message async.
context->getDefaultMQPushConsumer()->submitSendTraceRequest(message, consumeTraceCallback);
return;
}