public void consumeMessageBefore()

in ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsConsumeMessageHookImpl.java [49:91]


    public void consumeMessageBefore(ConsumeMessageContext context) {
        if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
            return;
        }
        OnsTraceContext onsTraceContext = new OnsTraceContext();
        context.setMqTraceContext(onsTraceContext);
        onsTraceContext.setTraceType(OnsTraceType.SubBefore);
        String userGroup = NamespaceUtil.withoutNamespace(context.getConsumerGroup(), context.getNamespace());
        onsTraceContext.setGroupName(userGroup);
        List<OnsTraceBean> beans = new ArrayList<OnsTraceBean>();
        for (MessageExt msg : context.getMsgList()) {
            if (msg == null) {
                continue;
            }
            String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION);
            String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH);
            if (regionId == null || regionId.equals(OnsTraceConstants.default_region)) {
                // if regionId is default ,skip it
                continue;
            }
            if (traceOn != null && "false".equals(traceOn)) {
                // if trace switch is false ,skip it
                continue;
            }
            OnsTraceBean traceBean = new OnsTraceBean();

            String userTopic = NamespaceUtil.withoutNamespace(msg.getTopic(), context.getNamespace());
            traceBean.setTopic(userTopic);
            traceBean.setMsgId(msg.getMsgId());
            traceBean.setTags(msg.getTags());
            traceBean.setKeys(msg.getKeys());
            traceBean.setStoreTime(msg.getStoreTimestamp());
            traceBean.setBodyLength(msg.getStoreSize());
            traceBean.setRetryTimes(msg.getReconsumeTimes());
            onsTraceContext.setRegionId(regionId);
            beans.add(traceBean);
        }
        if (beans.size() > 0) {
            onsTraceContext.setTraceBeans(beans);
            onsTraceContext.setTimeStamp(System.currentTimeMillis());
            localDispatcher.append(onsTraceContext);
        }
    }