bool RebalancePush::updateRequestTableInRebalance()

in src/consumer/Rebalance.cpp [462:553]


bool RebalancePush::updateRequestTableInRebalance(const string& topic, vector<MQMessageQueue>& mqsSelf) {
  LOG_DEBUG("updateRequestTableInRebalance for Topic[%s] Enter", topic.c_str());

  // 1. Clear no in charge of
  //   1. set dropped
  //   2. clear local message
  //   3. clear offset
  //   4. remove request table
  //   5. set flag for route changed
  // 2. Check and clear dropped/invalid pullrequest(timeout and so on)
  // 3. Add new mq in charge of
  //   1. new pullrequest
  //   2. init next pull offset
  //   3. int  offset
  //   4. add request table
  //   5. set flag for route changed
  // 4. Start long pull for request
  if (mqsSelf.empty()) {
    LOG_WARN("allocated queue is empty for topic:%s", topic.c_str());
  }

  bool changed = false;

  //<!remove none responsive mq
  MQ2PULLREQ requestQueueTable(getPullRequestTable());
  MQ2PULLREQ::iterator itDel = requestQueueTable.begin();
  for (; itDel != requestQueueTable.end(); ++itDel) {
    MQMessageQueue mqtemp = itDel->first;
    if (mqtemp.getTopic().compare(topic) == 0) {
      if (mqsSelf.empty() || (std::find(mqsSelf.begin(), mqsSelf.end(), mqtemp) == mqsSelf.end())) {
        // if not response , set to dropped
        LOG_INFO("Drop mq:%s,because not responsive", mqtemp.toString().c_str());
        itDel->second->setDropped(true);
        // remove offset table to avoid offset backup
        removeUnnecessaryMessageQueue(mqtemp);
        itDel->second->clearAllMsgs();
        removePullRequest(mqtemp);
        changed = true;
      } else if (itDel->second->isPullRequestExpired()) {
        // if pull expired , set to dropped, eg: if add pull task error, the pull request will be expired.
        LOG_INFO("Drop mq:%s according Pull timeout.", mqtemp.toString().c_str());
        itDel->second->setDropped(true);
        removeUnnecessaryMessageQueue(mqtemp);
        itDel->second->clearAllMsgs();
        removePullRequest(mqtemp);
        changed = true;
      }
    }
  }

  //<!add check new mq added.
  vector<boost::shared_ptr<PullRequest>> pullRequestsToAdd;
  vector<MQMessageQueue>::iterator itAdd = mqsSelf.begin();
  for (; itAdd != mqsSelf.end(); ++itAdd) {
    if (isPullRequestExist(*itAdd)) {
      // have check the expired pull request, re-add it.
      continue;
    }
    boost::shared_ptr<PullRequest> pullRequest = boost::make_shared<PullRequest>(m_pConsumer->getGroupName());
    pullRequest->m_messageQueue = *itAdd;
    int64 nextOffset = computePullFromWhere(*itAdd);
    if (nextOffset >= 0) {
      pullRequest->setNextOffset(nextOffset);
      pullRequest->setDropped(false);
      changed = true;
      addPullRequest(*itAdd, pullRequest);
      pullRequestsToAdd.push_back(pullRequest);
      LOG_INFO("Add mq:%s, request initial offset:%ld", (*itAdd).toString().c_str(), nextOffset);
    } else {
      LOG_WARN(
          "Failed to add pull request for %s due to failure of querying consume offset, request initial offset:%ld",
          (*itAdd).toString().c_str(), nextOffset);
    }
  }

  for (vector<boost::shared_ptr<PullRequest>>::iterator itAdded = pullRequestsToAdd.begin();
       itAdded != pullRequestsToAdd.end(); ++itAdded) {
    LOG_INFO("Start to pull %s, offset:%ld, GroupName %s", (*itAdded)->m_messageQueue.toString().c_str(),
             (*itAdded)->getNextOffset(), (*itAdded)->getGroupName().c_str());
    if (!m_pConsumer->producePullMsgTask(*itAdded)) {
      LOG_WARN(
          "Failed to producer pull message task for %s, Remove it from Request table and wait for next #Rebalance.",
          (*itAdded)->m_messageQueue.toString().c_str());
      // remove from request table, and wait for next rebalance.
      (*itAdded)->setDropped(true);
      removePullRequest((*itAdded)->m_messageQueue);
    }
  }

  LOG_DEBUG("updateRequestTableInRebalance Topic[%s] exit", topic.c_str());
  return changed;
}