void ConsumeMessageHookImpl::executeHookBefore()

in src/consumer/ConsumeMessageHookImpl.cpp [45:89]


void ConsumeMessageHookImpl::executeHookBefore(ConsumeMessageContext* context) {
  if (context == NULL || context->getMsgList().empty()) {
    return;
  }
  TraceContext* traceContext = new TraceContext();
  context->setTraceContext(traceContext);
  traceContext->setTraceType(SubBefore);
  traceContext->setGroupName(NameSpaceUtil::withoutNameSpace(context->getConsumerGroup(), context->getNameSpace()));
  std::vector<TraceBean> beans;

  std::vector<MQMessageExt> msgs = context->getMsgList();
  std::vector<MQMessageExt>::iterator it = msgs.begin();
  for (; it != msgs.end(); ++it) {
    std::string traceOn = it->getProperty(MQMessage::PROPERTY_TRACE_SWITCH);
    if (traceOn != "" && traceOn == "false") {
      continue;
    }
    TraceBean bean;
    bean.setTopic((*it).getTopic());
    bean.setMsgId((*it).getMsgId());
    bean.setTags((*it).getTags());
    bean.setKeys((*it).getKeys());
    bean.setStoreHost((*it).getStoreHostString());
    bean.setStoreTime((*it).getStoreTimestamp());
    bean.setBodyLength((*it).getStoreSize());
    bean.setRetryTimes((*it).getReconsumeTimes());
    std::string regionId = (*it).getProperty(MQMessage::PROPERTY_MSG_REGION);
    if (regionId.empty()) {
      regionId = TraceConstant::DEFAULT_REDION;
    }
    traceContext->setRegionId(regionId);
    traceContext->setTraceBean(bean);
  }
  traceContext->setTimeStamp(UtilAll::currentTimeMillis());

  std::string topic = TraceConstant::TRACE_TOPIC + traceContext->getRegionId();

  TraceTransferBean ben = TraceUtil::CovertTraceContextToTransferBean(traceContext);
  MQMessage message(topic, ben.getTransData());
  message.setKeys(ben.getTransKey());

  // send trace message async.
  context->getDefaultMQPushConsumer()->submitSendTraceRequest(message, consumeTraceCallback);
  return;
}