void ConsumeMessageOrderlyService::ConsumeRequest()

in src/consumer/ConsumeMessageOrderlyService.cpp [139:259]


void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest> pullRequest) {
  boost::shared_ptr<PullRequest> request = pullRequest.lock();
  if (!request) {
    LOG_WARN("Pull request has been released");
    return;
  }
  bool bGetMutex = false;
  boost::unique_lock<boost::timed_mutex> lock(request->getPullRequestCriticalSection(), boost::try_to_lock);
  if (!lock.owns_lock()) {
    if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(1))) {
      LOG_ERROR("ConsumeRequest of:%s get timed_mutex timeout", request->m_messageQueue.toString().c_str());
      return;
    } else {
      bGetMutex = true;
    }
  } else {
    bGetMutex = true;
  }
  if (!bGetMutex) {
    // LOG_INFO("pullrequest of mq:%s consume inprogress",
    // request->m_messageQueue.toString().c_str());
    return;
  }
  if (!request || request->isDropped()) {
    LOG_WARN("the pull result is NULL or Had been dropped");
    request->clearAllMsgs();  // add clear operation to avoid bad state when
                              // dropped pullRequest returns normal
    return;
  }

  if (m_pMessageListener) {
    if ((request->isLocked() && !request->isLockExpired()) || m_pConsumer->getMessageModel() == BROADCASTING) {
      // DefaultMQPushConsumer* pConsumer = (DefaultMQPushConsumer*)m_pConsumer;
      uint64_t beginTime = UtilAll::currentTimeMillis();
      bool continueConsume = true;
      while (continueConsume) {
        if ((UtilAll::currentTimeMillis() - beginTime) > m_MaxTimeConsumeContinuously) {
          LOG_INFO("Continuely consume %s more than 60s, consume it 1s later",
                   request->m_messageQueue.toString().c_str());
          tryLockLaterAndReconsumeDelay(request, false, 1000);
          break;
        }
        vector<MQMessageExt> msgs;
        // request->takeMessages(msgs, pConsumer->getConsumeMessageBatchMaxSize());
        request->takeMessages(msgs, 1);
        if (!msgs.empty()) {
          request->setLastConsumeTimestamp(UtilAll::currentTimeMillis());
          if (m_pConsumer->isUseNameSpaceMode()) {
            MessageAccessor::withoutNameSpace(msgs, m_pConsumer->getNameSpace());
          }
          ConsumeMessageContext consumeMessageContext;
          DefaultMQPushConsumerImpl* pConsumer = dynamic_cast<DefaultMQPushConsumerImpl*>(m_pConsumer);
          std::string groupName = pConsumer->getGroupName();
          if (pConsumer) {
            if (pConsumer->getMessageTrace() && pConsumer->hasConsumeMessageHook()) {
              consumeMessageContext.setDefaultMQPushConsumer(pConsumer);
              consumeMessageContext.setConsumerGroup(pConsumer->getGroupName());
              consumeMessageContext.setMessageQueue(request->m_messageQueue);
              consumeMessageContext.setMsgList(msgs);
              consumeMessageContext.setSuccess(false);
              consumeMessageContext.setNameSpace(pConsumer->getNameSpace());
              pConsumer->executeConsumeMessageHookBefore(&consumeMessageContext);
            }
          }
          uint64 startTimeStamp = UtilAll::currentTimeMillis();
          ConsumeStatus consumeStatus = m_pMessageListener->consumeMessage(msgs);

          uint64 consumerRT = UtilAll::currentTimeMillis() - startTimeStamp;
          StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeRT(request->m_messageQueue.getTopic(),
                                                                                  groupName, consumerRT);
          if (consumeStatus == RECONSUME_LATER) {
            StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeFailedTPS(
                request->m_messageQueue.getTopic(), groupName, 1);

            if (pConsumer) {
              consumeMessageContext.setMsgIndex(0);
              consumeMessageContext.setStatus("RECONSUME_LATER");
              consumeMessageContext.setSuccess(false);
              pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
            }
            if (msgs[0].getReconsumeTimes() <= 15) {
              msgs[0].setReconsumeTimes(msgs[0].getReconsumeTimes() + 1);
              request->makeMessageToCosumeAgain(msgs);
              continueConsume = false;
              tryLockLaterAndReconsumeDelay(request, false, 1000);
            } else {
              // need change to reconsumer delay level and print log.
              LOG_INFO("Local Consume failed [%d] times, change [%s] delay to 5s.", msgs[0].getReconsumeTimes(),
                       msgs[0].getMsgId().c_str());
              msgs[0].setReconsumeTimes(msgs[0].getReconsumeTimes() + 1);
              continueConsume = false;
              request->makeMessageToCosumeAgain(msgs);
              tryLockLaterAndReconsumeDelay(request, false, 5000);
            }
          } else {
            StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeOKTPS(
                request->m_messageQueue.getTopic(), groupName, 1);
            if (pConsumer) {
              consumeMessageContext.setMsgIndex(0);
              consumeMessageContext.setStatus("CONSUME_SUCCESS");
              consumeMessageContext.setSuccess(true);
              pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
            }
            m_pConsumer->updateConsumeOffset(request->m_messageQueue, request->commit());
          }
        } else {
          continueConsume = false;
        }
        msgs.clear();
        if (m_shutdownInprogress) {
          LOG_INFO("shutdown inprogress, break the consuming");
          return;
        }
      }
      LOG_DEBUG("consume once exit of mq:%s", request->m_messageQueue.toString().c_str());
    } else {
      LOG_ERROR("message queue:%s was not locked", request->m_messageQueue.toString().c_str());
      tryLockLaterAndReconsumeDelay(request, true, 1000);
    }
  }
}