in ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsConsumeMessageHookImpl.java [49:91]
public void consumeMessageBefore(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
OnsTraceContext onsTraceContext = new OnsTraceContext();
context.setMqTraceContext(onsTraceContext);
onsTraceContext.setTraceType(OnsTraceType.SubBefore);
String userGroup = NamespaceUtil.withoutNamespace(context.getConsumerGroup(), context.getNamespace());
onsTraceContext.setGroupName(userGroup);
List<OnsTraceBean> beans = new ArrayList<OnsTraceBean>();
for (MessageExt msg : context.getMsgList()) {
if (msg == null) {
continue;
}
String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION);
String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH);
if (regionId == null || regionId.equals(OnsTraceConstants.default_region)) {
// if regionId is default ,skip it
continue;
}
if (traceOn != null && "false".equals(traceOn)) {
// if trace switch is false ,skip it
continue;
}
OnsTraceBean traceBean = new OnsTraceBean();
String userTopic = NamespaceUtil.withoutNamespace(msg.getTopic(), context.getNamespace());
traceBean.setTopic(userTopic);
traceBean.setMsgId(msg.getMsgId());
traceBean.setTags(msg.getTags());
traceBean.setKeys(msg.getKeys());
traceBean.setStoreTime(msg.getStoreTimestamp());
traceBean.setBodyLength(msg.getStoreSize());
traceBean.setRetryTimes(msg.getReconsumeTimes());
onsTraceContext.setRegionId(regionId);
beans.add(traceBean);
}
if (beans.size() > 0) {
onsTraceContext.setTraceBeans(beans);
onsTraceContext.setTimeStamp(System.currentTimeMillis());
localDispatcher.append(onsTraceContext);
}
}