void DefaultMQPushConsumerImpl::pullMessageAsync()

in src/consumer/DefaultMQPushConsumerImpl.cpp [829:923]


void DefaultMQPushConsumerImpl::pullMessageAsync(boost::weak_ptr<PullRequest> pullRequest) {
  boost::shared_ptr<PullRequest> request = pullRequest.lock();
  if (!request) {
    LOG_ERROR("Pull request is released, return");
    return;
  }
  if (request->isDropped()) {
    LOG_WARN("Pull request is set drop with mq:%s, return", (request->m_messageQueue).toString().c_str());
    return;
  }
  MQMessageQueue& messageQueue = request->m_messageQueue;
  if (m_consumerService->getConsumeMsgSerivceListenerType() == messageListenerOrderly) {
    if (!request->isLocked() || request->isLockExpired()) {
      if (!m_pRebalance->lock(messageQueue)) {
        request->setLastPullTimestamp(UtilAll::currentTimeMillis());
        // Retry later.
        producePullMsgTaskLater(request, 1000);
        return;
      }
    }
  }

  if (request->getCacheMsgCount() > m_maxMsgCacheSize) {
    LOG_INFO("Pull request for [%s] has Cached with %d Messages and The Max size is %d, Sleep 3s.",
             (request->m_messageQueue).toString().c_str(), request->getCacheMsgCount(), m_maxMsgCacheSize);
    request->setLastPullTimestamp(UtilAll::currentTimeMillis());
    // Retry 3s,
    producePullMsgTaskLater(request, 3000);
    return;
  }

  bool commitOffsetEnable = false;
  int64 commitOffsetValue = 0;
  if (CLUSTERING == getMessageModel()) {
    commitOffsetValue = m_pOffsetStore->readOffset(messageQueue, READ_FROM_MEMORY, getSessionCredentials());
    if (commitOffsetValue > 0) {
      commitOffsetEnable = true;
    }
  }

  string subExpression;
  SubscriptionData* pSdata = (m_pRebalance->getSubscriptionData(messageQueue.getTopic()));
  if (pSdata == NULL) {
    LOG_INFO("Can not get SubscriptionData of Pull request for [%s], Sleep 1s.",
             (request->m_messageQueue).toString().c_str());
    // Subscribe data error, retry later.
    producePullMsgTaskLater(request, 1000);
    return;
  }
  subExpression = pSdata->getSubString();

  int sysFlag = PullSysFlag::buildSysFlag(commitOffsetEnable,      // commitOffset
                                          true,                    // suspend
                                          !subExpression.empty(),  // subscription
                                          false);                  // class filter

  AsyncArg arg;
  arg.mq = messageQueue;
  arg.subData = *pSdata;
  arg.pPullWrapper = m_pPullAPIWrapper;
  if (request->isDropped()) {
    LOG_WARN("Pull request is set as dropped with mq:%s, return", request->m_messageQueue.toString().c_str());
    return;
  }
  try {
    request->setLastPullTimestamp(UtilAll::currentTimeMillis());
    AsyncPullCallback* pullCallback = getAsyncPullCallBack(request, messageQueue);
    if (pullCallback == NULL) {
      LOG_WARN("Can not get pull callback for:%s, Maybe this pull request has been released.",
               request->m_messageQueue.toString().c_str());
      return;
    }
    m_pPullAPIWrapper->pullKernelImpl(messageQueue,              // 1
                                      subExpression,             // 2
                                      pSdata->getSubVersion(),   // 3
                                      request->getNextOffset(),  // 4
                                      32,                        // 5
                                      sysFlag,                   // 6
                                      commitOffsetValue,         // 7
                                      1000 * 15,                 // 8
                                      m_asyncPullTimeout,        // 9
                                      ComMode_ASYNC,             // 10
                                      pullCallback,              // 11
                                      getSessionCredentials(),   // 12
                                      &arg);                     // 13
  } catch (MQException& e) {
    LOG_ERROR("%s", e.what());
    if (request->isDropped()) {
      LOG_WARN("Pull request is set as dropped with mq:%s, return", (request->m_messageQueue).toString().c_str());
      return;
    }
    LOG_INFO("Pull %s occur exception, restart 1s  later.", (request->m_messageQueue).toString().c_str());
    producePullMsgTaskLater(request, 1000);
  }
}