virtual void onSuccess()

in src/consumer/DefaultMQPushConsumerImpl.cpp [46:165]


  virtual void onSuccess(MQMessageQueue& mq, PullResult& result, bool bProducePullRequest) {
    boost::shared_ptr<PullRequest> pullRequest = m_pullRequest.lock();
    if (!pullRequest) {
      LOG_WARN("Pull request for[%s] has been released", mq.toString().c_str());
      return;
    }

    if (m_bShutdown) {
      LOG_INFO("pullrequest for:%s in shutdown, return", (pullRequest->m_messageQueue).toString().c_str());
      return;
    }
    if (pullRequest->isDropped()) {
      LOG_INFO("Pull request for queue[%s] has been set as dropped. Will NOT pull this queue any more",
               pullRequest->m_messageQueue.toString().c_str());
      return;
    }
    switch (result.pullStatus) {
      case FOUND: {
        if (pullRequest->isDropped()) {
          LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", (pullRequest->m_messageQueue).toString().c_str());
          break;
        }

        uint64 pullRT = UtilAll::currentTimeMillis() - pullRequest->getLastPullTimestamp();
        StatsServerManager::getInstance()->getConsumeStatServer()->incConsumeRT(
            pullRequest->m_messageQueue.getTopic(), m_callbackOwner->getGroupName(), pullRT);
        pullRequest->setNextOffset(result.nextBeginOffset);
        pullRequest->putMessage(result.msgFoundList);
        if (!result.msgFoundList.empty()) {
          StatsServerManager::getInstance()->getConsumeStatServer()->incPullTPS(
              pullRequest->m_messageQueue.getTopic(), m_callbackOwner->getGroupName(), result.msgFoundList.size());
        }
        m_callbackOwner->getConsumerMsgService()->submitConsumeRequest(pullRequest, result.msgFoundList);

        if (bProducePullRequest) {
          m_callbackOwner->producePullMsgTask(pullRequest);
        } else {
          LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s",
                   (pullRequest->m_messageQueue).toString().c_str());
        }

        LOG_DEBUG("FOUND:%s with size:" SIZET_FMT ", nextBeginOffset:%lld",
                  (pullRequest->m_messageQueue).toString().c_str(), result.msgFoundList.size(), result.nextBeginOffset);

        break;
      }
      case NO_NEW_MSG: {
        if (pullRequest->isDropped()) {
          LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", (pullRequest->m_messageQueue).toString().c_str());
          break;
        }
        pullRequest->setNextOffset(result.nextBeginOffset);

        if ((pullRequest->getCacheMsgCount() == 0) && (result.nextBeginOffset > 0)) {
          m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue, result.nextBeginOffset);
        }
        if (bProducePullRequest) {
          m_callbackOwner->producePullMsgTask(pullRequest);
        } else {
          LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s",
                   (pullRequest->m_messageQueue).toString().c_str());
        }
        LOG_DEBUG("NO_NEW_MSG:%s,nextBeginOffset:%lld", pullRequest->m_messageQueue.toString().c_str(),
                  result.nextBeginOffset);
        break;
      }
      case NO_MATCHED_MSG: {
        if (pullRequest->isDropped()) {
          LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", (pullRequest->m_messageQueue).toString().c_str());
          break;
        }
        pullRequest->setNextOffset(result.nextBeginOffset);

        if ((pullRequest->getCacheMsgCount() == 0) && (result.nextBeginOffset > 0)) {
          m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue, result.nextBeginOffset);
        }
        if (bProducePullRequest) {
          m_callbackOwner->producePullMsgTask(pullRequest);
        } else {
          LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s",
                   (pullRequest->m_messageQueue).toString().c_str());
        }
        LOG_DEBUG("NO_MATCHED_MSG:%s,nextBeginOffset:%lld", pullRequest->m_messageQueue.toString().c_str(),
                  result.nextBeginOffset);
        break;
      }
      case OFFSET_ILLEGAL: {
        if (pullRequest->isDropped()) {
          LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", (pullRequest->m_messageQueue).toString().c_str());
          break;
        }
        pullRequest->setNextOffset(result.nextBeginOffset);
        if (bProducePullRequest) {
          m_callbackOwner->producePullMsgTask(pullRequest);
        } else {
          LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s",
                   (pullRequest->m_messageQueue).toString().c_str());
        }

        LOG_DEBUG("OFFSET_ILLEGAL:%s,nextBeginOffset:%lld", pullRequest->m_messageQueue.toString().c_str(),
                  result.nextBeginOffset);
        break;
      }
      case BROKER_TIMEOUT: {
        if (pullRequest->isDropped()) {
          LOG_INFO("[Dropped]Remove pullmsg event of mq:%s", (pullRequest->m_messageQueue).toString().c_str());
          break;
        }
        LOG_ERROR("impossible BROKER_TIMEOUT Occurs");
        pullRequest->setNextOffset(result.nextBeginOffset);
        if (bProducePullRequest) {
          m_callbackOwner->producePullMsgTask(pullRequest);
        } else {
          LOG_INFO("[bProducePullRequest = false]Stop pullmsg event of mq:%s",
                   (pullRequest->m_messageQueue).toString().c_str());
        }
        break;
      }
    }
  }