void DefaultMQPushConsumerImpl::pullMessage()

in src/consumer/DefaultMQPushConsumerImpl.cpp [632:789]


void DefaultMQPushConsumerImpl::pullMessage(boost::weak_ptr<PullRequest> pullRequest) {
  boost::shared_ptr<PullRequest> request = pullRequest.lock();
  if (!request) {
    LOG_ERROR("Pull request is released, return");
    return;
  }
  if (request->isDropped()) {
    LOG_WARN("Pull request is set drop with mq:%s, return", (request->m_messageQueue).toString().c_str());
    // request->removePullMsgEvent();
    return;
  }

  MQMessageQueue& messageQueue = request->m_messageQueue;
  if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly) {
    if (!request->isLocked() || request->isLockExpired()) {
      if (!m_pRebalance->lock(messageQueue)) {
        request->setLastPullTimestamp(UtilAll::currentTimeMillis());
        producePullMsgTaskLater(request, 1000);
        return;
      }
    }
  }

  if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
    LOG_INFO("Sync Pull request for %s has Cached with %d Messages and The Max size is %d, Sleep 1s.",
             (request->m_messageQueue).toString().c_str(), request->getCacheMsgCount(), m_maxMsgCacheSize);
    request->setLastPullTimestamp(UtilAll::currentTimeMillis());
    // Retry 1s,
    producePullMsgTaskLater(request, 1000);
    return;
  }

  bool commitOffsetEnable = false;
  int64 commitOffsetValue = 0;
  if (CLUSTERING == getMessageModel()) {
    commitOffsetValue = m_pOffsetStore->readOffset(messageQueue, READ_FROM_MEMORY, getSessionCredentials());
    if (commitOffsetValue > 0) {
      commitOffsetEnable = true;
    }
  }

  string subExpression;
  SubscriptionData* pSdata = m_pRebalance->getSubscriptionData(messageQueue.getTopic());
  if (pSdata == NULL) {
    LOG_INFO("Can not get SubscriptionData of Pull request for [%s], Sleep 1s.",
             (request->m_messageQueue).toString().c_str());
    producePullMsgTaskLater(request, 1000);
    return;
  }
  subExpression = pSdata->getSubString();

  int sysFlag = PullSysFlag::buildSysFlag(commitOffsetEnable,      // commitOffset
                                          false,                   // suspend
                                          !subExpression.empty(),  // subscription
                                          false);                  // class filter
  if (request->isDropped()) {
    LOG_WARN("Pull request is set as dropped with mq:%s, return", (request->m_messageQueue).toString().c_str());
    return;
  }
  try {
    uint64 startTimeStamp = UtilAll::currentTimeMillis();
    request->setLastPullTimestamp(startTimeStamp);
    unique_ptr<PullResult> result(m_pPullAPIWrapper->pullKernelImpl(messageQueue,              // 1
                                                                    subExpression,             // 2
                                                                    pSdata->getSubVersion(),   // 3
                                                                    request->getNextOffset(),  // 4
                                                                    32,                        // 5
                                                                    sysFlag,                   // 6
                                                                    commitOffsetValue,         // 7
                                                                    1000 * 15,                 // 8
                                                                    1000 * 30,                 // 9
                                                                    ComMode_SYNC,              // 10
                                                                    NULL, getSessionCredentials()));

    PullResult pullResult = m_pPullAPIWrapper->processPullResult(messageQueue, result.get(), pSdata);
    switch (pullResult.pullStatus) {
      case FOUND: {
        uint64 pullRT = UtilAll::currentTimeMillis() - startTimeStamp;
        StatsServerManager::getInstance()->getConsumeStatServer()->incPullRT(messageQueue.getTopic(), getGroupName(),
                                                                                pullRT);
        if (request->isDropped()) {
          LOG_INFO("Get pull result but the queue has been marked as dropped. Queue: %s",
                   messageQueue.toString().c_str());
          break;
        }
        // and this request is dropped, and then received pulled msgs.
        request->setNextOffset(pullResult.nextBeginOffset);
        request->putMessage(pullResult.msgFoundList);
        if (!pullResult.msgFoundList.empty()) {
          StatsServerManager::getInstance()->getConsumeStatServer()->incPullTPS(messageQueue.getTopic(), getGroupName(),
                                                                                pullResult.msgFoundList.size());
        }
        m_consumerService->submitConsumeRequest(request, pullResult.msgFoundList);
        producePullMsgTask(request);

        LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ",nextBeginOffset:%lld", messageQueue.toString().c_str(),
                  pullResult.msgFoundList.size(), pullResult.nextBeginOffset);

        break;
      }
      case NO_NEW_MSG: {
        if (request->isDropped()) {
          LOG_INFO("Get pull result but the queue has been marked as dropped. Queue: %s",
                   messageQueue.toString().c_str());
          break;
        }
        request->setNextOffset(pullResult.nextBeginOffset);
        if ((request->getCacheMsgCount() == 0) && (pullResult.nextBeginOffset >= 0)) {
          updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
        }
        producePullMsgTask(request);
        LOG_DEBUG("NO_NEW_MSG:%s,nextBeginOffset:%lld", messageQueue.toString().c_str(), pullResult.nextBeginOffset);
        break;
      }
      case NO_MATCHED_MSG: {
        if (request->isDropped()) {
          LOG_INFO("Get pull result but the queue has been marked as dropped. Queue: %s",
                   messageQueue.toString().c_str());
          break;
        }
        request->setNextOffset(pullResult.nextBeginOffset);
        if ((request->getCacheMsgCount() == 0) && (pullResult.nextBeginOffset >= 0)) {
          updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
        }
        producePullMsgTask(request);

        LOG_DEBUG("NO_MATCHED_MSG:%s,nextBeginOffset:%lld", messageQueue.toString().c_str(),
                  pullResult.nextBeginOffset);
        break;
      }
      case OFFSET_ILLEGAL: {
        if (request->isDropped()) {
          LOG_INFO("Get pull result but the queue has been marked as dropped. Queue: %s",
                   messageQueue.toString().c_str());
          break;
        }
        request->setNextOffset(pullResult.nextBeginOffset);
        producePullMsgTask(request);

        LOG_DEBUG("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld", messageQueue.toString().c_str(),
                  pullResult.nextBeginOffset);
        break;
      }
      case BROKER_TIMEOUT: {  // as BROKER_TIMEOUT is defined by client, broker
        // will not returns this status, so this case
        // could not be entered.
        LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
        request->setNextOffset(pullResult.nextBeginOffset);
        producePullMsgTask(request);
        break;
      }
    }
  } catch (MQException& e) {
    LOG_ERROR("%s", e.what());
    LOG_WARN("Pull %s occur exception, restart 1s  later.", messageQueue.toString().c_str());
    producePullMsgTaskLater(request, 1000);
  }
}