void Rebalance::doRebalance()

in src/consumer/Rebalance.cpp [55:151]


void Rebalance::doRebalance() {
  LOG_DEBUG("start doRebalance");
  try {
    map<string, SubscriptionData*>::iterator it = m_subscriptionData.begin();
    for (; it != m_subscriptionData.end(); ++it) {
      string topic = (it->first);
      LOG_DEBUG("current topic is:%s", topic.c_str());
      //<!topic -> mqs
      vector<MQMessageQueue> mqAll;
      if (!getTopicSubscribeInfo(topic, mqAll)) {
        continue;
      }
      if (mqAll.empty()) {
        if (!UtilAll::startsWith_retry(topic)) {
          std::string msg("#doRebalance. mqAll for topic:");
          msg.append(topic);
          msg.append(" is empty");
          LOG_ERROR("Queues to allocate are empty. Msg: %s", msg.c_str());
          // to check, return error or throw exception
          THROW_MQEXCEPTION(MQClientException, msg, -1);
        }
      }

      //<!msg model;
      switch (m_pConsumer->getMessageModel()) {
        case BROADCASTING: {
          bool changed = updateRequestTableInRebalance(topic, mqAll);
          if (changed) {
            messageQueueChanged(topic, mqAll, mqAll);
          }
          break;
        }
        case CLUSTERING: {
          vector<string> cidAll;
          m_pClientFactory->findConsumerIds(topic, m_pConsumer->getGroupName(), cidAll,
                                            m_pConsumer->getSessionCredentials());

          if (cidAll.empty()) {
            LOG_ERROR("[ERROR] Get empty consumer IDs. Consumer Group: %s, Topic: %s",
                      m_pConsumer->getGroupName().c_str(), topic.c_str());
            // Should skip this round of re-balance immediately if consumer ID set is empty.
            THROW_MQEXCEPTION(MQClientException, "doRebalance the cidAll is empty", -1);
          }
          // log
          for (int i = 0; i < (int)cidAll.size(); ++i) {
            LOG_DEBUG("client id:%s of topic:%s", cidAll[i].c_str(), topic.c_str());
          }
          //<! sort;
          sort(mqAll.begin(), mqAll.end());
          sort(cidAll.begin(), cidAll.end());

          //<! allocate;
          vector<MQMessageQueue> allocateResult;
          try {
            m_pAllocateMQStrategy->allocate(m_pConsumer->getMQClientId(), mqAll, cidAll, allocateResult);
          } catch (MQException& e) {
            std::string errMsg("Allocate message queue for ConsumerGroup[");
            errMsg.append(m_pConsumer->getGroupName());
            errMsg.append("],Topic[");
            errMsg.append(topic);
            errMsg.append("] failed. ");
            LOG_ERROR("%s", errMsg.c_str());
            THROW_MQEXCEPTION(MQClientException, errMsg, -1);
          }

          // log
          for (int i = 0; i < (int)allocateResult.size(); ++i) {
            LOG_DEBUG("allocate mq:%s", allocateResult[i].toString().c_str());
          }

          //<!update local;
          bool changed = updateRequestTableInRebalance(topic, allocateResult);
          if (changed) {
            std::stringstream ss;
            ss << "Allocation result for [Consumer Group: " << m_pConsumer->getGroupName() << ", Topic: " << topic
               << ", Current Consumer ID: " << m_pConsumer->getMQClientId() << "] is changed.\n"
               << "Total Queue :#" << mqAll.size() << ", Total Consumer :#" << cidAll.size()
               << ", Allocated Queues are: \n";

            for (vector<MQMessageQueue>::size_type i = 0; i < allocateResult.size(); ++i) {
              ss << allocateResult[i].toString() << "\n";
            }
            // Log allocation result.
            LOG_INFO("%s", ss.str().c_str());

            messageQueueChanged(topic, mqAll, allocateResult);
            break;
          }
        }
        default:
          break;
      }
    }
  } catch (MQException& e) {
    LOG_ERROR("%s", e.what());
  }
}