void ConsumeMessageConcurrentlyService::ConsumeRequest()

in src/consumer/ConsumeMessageConcurrentlyService.cpp [143:324]


void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullRequest> pullRequest,
                                                       vector<MQMessageExt>& msgs) {
  boost::shared_ptr<PullRequest> request = pullRequest.lock();
  if (!request) {
    LOG_WARN("Pull request has been released");
    return;
  }
  if (request->isDropped()) {
    LOG_WARN("the pull request for %s Had been dropped before", request->m_messageQueue.toString().c_str());
    request->clearAllMsgs();  // add clear operation to avoid bad state when
    // dropped pullRequest returns normal
    return;
  }
  if (msgs.empty()) {
    LOG_WARN("the msg of pull result is NULL,its mq:%s", (request->m_messageQueue).toString().c_str());
    return;
  }
  ConsumeMessageContext consumeMessageContext;
  DefaultMQPushConsumerImpl* pConsumer = dynamic_cast<DefaultMQPushConsumerImpl*>(m_pConsumer);
  std::string groupName = pConsumer->getGroupName();
  if (pConsumer) {
    if (pConsumer->getMessageTrace() && pConsumer->hasConsumeMessageHook()) {
      consumeMessageContext.setDefaultMQPushConsumer(pConsumer);
      consumeMessageContext.setConsumerGroup(groupName);
      consumeMessageContext.setMessageQueue(request->m_messageQueue);
      consumeMessageContext.setMsgList(msgs);
      consumeMessageContext.setSuccess(false);
      consumeMessageContext.setNameSpace(pConsumer->getNameSpace());
      pConsumer->executeConsumeMessageHookBefore(&consumeMessageContext);
    }
  }
  ConsumeStatus status = CONSUME_SUCCESS;
  if (m_pMessageListener != NULL) {
    resetRetryTopic(msgs);
    request->setLastConsumeTimestamp(UtilAll::currentTimeMillis());
    LOG_DEBUG("=====Receive Messages,Topic[%s], MsgId[%s],Body[%s],RetryTimes[%d]", msgs[0].getTopic().c_str(),
              msgs[0].getMsgId().c_str(), msgs[0].getBody().c_str(), msgs[0].getReconsumeTimes());
    if (m_pConsumer->isUseNameSpaceMode()) {
      MessageAccessor::withoutNameSpace(msgs, m_pConsumer->getNameSpace());
    }

    if (pConsumer->getMessageTrace() && pConsumer->hasConsumeMessageHook()) {
      // For open trace message, consume message one by one.
      for (size_t i = 0; i < msgs.size(); ++i) {
        LOG_DEBUG("=====Trace Receive Messages,Topic[%s], MsgId[%s],Body[%s],RetryTimes[%d]",
                  msgs[i].getTopic().c_str(), msgs[i].getMsgId().c_str(), msgs[i].getBody().c_str(),
                  msgs[i].getReconsumeTimes());
        std::vector<MQMessageExt> msgInner;
        msgInner.push_back(msgs[i]);
        if (status != CONSUME_SUCCESS) {
          // all the Messages behind should be set to failed.
          status = RECONSUME_LATER;
          consumeMessageContext.setMsgIndex(i);
          consumeMessageContext.setStatus("RECONSUME_LATER");
          consumeMessageContext.setSuccess(false);
          pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
          continue;
        }
        uint64 startTimeStamp = UtilAll::currentTimeMillis();
        try {
          status = m_pMessageListener->consumeMessage(msgInner);
        } catch (...) {
          status = RECONSUME_LATER;
          LOG_ERROR("Consumer's code is buggy. Un-caught exception raised");
        }
        uint64 consumerRT = UtilAll::currentTimeMillis() - startTimeStamp;
        StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeRT(request->m_messageQueue.getTopic(),
                                                                                groupName, consumerRT);
        consumeMessageContext.setMsgIndex(i);  // indicate message position,not support batch consumer
        if (status == CONSUME_SUCCESS) {
          consumeMessageContext.setStatus("CONSUME_SUCCESS");
          consumeMessageContext.setSuccess(true);
        } else {
          status = RECONSUME_LATER;
          consumeMessageContext.setStatus("RECONSUME_LATER");
          consumeMessageContext.setSuccess(false);
        }
        pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
      }
    } else {
      uint64 startTimeStamp = UtilAll::currentTimeMillis();
      try {
        status = m_pMessageListener->consumeMessage(msgs);
      } catch (...) {
        status = RECONSUME_LATER;
        LOG_ERROR("Consumer's code is buggy. Un-caught exception raised");
      }
      uint64 consumerRT = UtilAll::currentTimeMillis() - startTimeStamp;
      StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeRT(request->m_messageQueue.getTopic(),
                                                                              groupName, consumerRT, msgs.size());
    }
  }

  int ackIndex = -1;
  switch (status) {
    case CONSUME_SUCCESS:
      ackIndex = msgs.size();
      break;
    case RECONSUME_LATER:
      ackIndex = -1;
      break;
    default:
      break;
  }

  std::vector<MQMessageExt> localRetryMsgs;
  switch (m_pConsumer->getMessageModel()) {
    case BROADCASTING: {
      // Note: broadcasting reconsume should do by application, as it has big
      // affect to broker cluster
      if (ackIndex != (int)msgs.size())
        LOG_WARN("BROADCASTING, the message consume failed, drop it:%s", (request->m_messageQueue).toString().c_str());
      break;
    }
    case CLUSTERING: {
      // status consumer tps
      if (ackIndex == -1) {
        StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeFailedTPS(
            request->m_messageQueue.getTopic(), groupName, msgs.size());
      } else {
        StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeOKTPS(request->m_messageQueue.getTopic(),
                                                                                   groupName, msgs.size());
      }

      // send back msg to broker;
      for (size_t i = ackIndex + 1; i < msgs.size(); i++) {
        LOG_DEBUG("consume fail, MQ is:%s, its msgId is:%s, index is:" SIZET_FMT ", reconsume times is:%d",
                  (request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(), i,
                  msgs[i].getReconsumeTimes());
        if (m_pConsumer->getConsumeType() == CONSUME_PASSIVELY) {
          string brokerName = request->m_messageQueue.getBrokerName();
          if (m_pConsumer->isUseNameSpaceMode()) {
            MessageAccessor::withNameSpace(msgs[i], m_pConsumer->getNameSpace());
          }
          if (!m_pConsumer->sendMessageBack(msgs[i], 0, brokerName)) {
            LOG_WARN("Send message back fail, MQ is:%s, its msgId is:%s, index is:%d, re-consume times is:%d",
                     (request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(), i,
                     msgs[i].getReconsumeTimes());
            msgs[i].setReconsumeTimes(msgs[i].getReconsumeTimes() + 1);
            localRetryMsgs.push_back(msgs[i]);
          }
        }
      }
      break;
    }
    default:
      break;
  }

  if (!localRetryMsgs.empty()) {
    LOG_ERROR("Client side re-consume launched due to both message consuming and SDK send-back retry failure");
    for (std::vector<MQMessageExt>::iterator itOrigin = msgs.begin(); itOrigin != msgs.end();) {
      bool remove = false;
      for (std::vector<MQMessageExt>::iterator itRetry = localRetryMsgs.begin(); itRetry != localRetryMsgs.end();
           itRetry++) {
        if (itRetry->getQueueOffset() == itOrigin->getQueueOffset()) {
          remove = true;
          break;
        }
      }
      if (remove) {
        itOrigin = msgs.erase(itOrigin);
      } else {
        itOrigin++;
      }
    }
  }
  // update offset
  int64 offset = request->removeMessage(msgs);
  if (offset >= 0) {
    m_pConsumer->updateConsumeOffset(request->m_messageQueue, offset);
  } else {
    LOG_WARN("Note: Get local offset for mq:%s failed, may be it is updated before. skip..",
             (request->m_messageQueue).toString().c_str());
  }
  if (!localRetryMsgs.empty()) {
    // submitConsumeRequest(request, localTryMsgs);
    LOG_INFO("Send [%d ]messages back to mq:%s failed, call reconsume again after 1s.", localRetryMsgs.size(),
             (request->m_messageQueue).toString().c_str());
    submitConsumeRequestLater(request, localRetryMsgs, 1000);
  }
}  // namespace rocketmq