void Rebalance::unlockAll()

in src/consumer/Rebalance.cpp [269:314]


void Rebalance::unlockAll(bool oneWay) {
  map<string, vector<MQMessageQueue>*> brokerMqs;
  MQ2PULLREQ requestQueueTable = getPullRequestTable();
  for (MQ2PULLREQ::iterator it = requestQueueTable.begin(); it != requestQueueTable.end(); ++it) {
    if (!(it->second->isDropped())) {
      if (brokerMqs.find(it->first.getBrokerName()) == brokerMqs.end()) {
        vector<MQMessageQueue>* mqs = new vector<MQMessageQueue>;
        brokerMqs[it->first.getBrokerName()] = mqs;
      } else {
        brokerMqs[it->first.getBrokerName()]->push_back(it->first);
      }
    }
  }
  LOG_INFO("unLockAll " SIZET_FMT " broker mqs", brokerMqs.size());
  for (map<string, vector<MQMessageQueue>*>::iterator itb = brokerMqs.begin(); itb != brokerMqs.end(); ++itb) {
    unique_ptr<FindBrokerResult> pFindBrokerResult(
        m_pClientFactory->findBrokerAddressInSubscribe(itb->first, MASTER_ID, true));
    if (!pFindBrokerResult) {
      LOG_ERROR("unlockAll findBrokerAddressInSubscribe ret null for broker:%s", itb->first.data());
      continue;
    }
    unique_ptr<UnlockBatchRequestBody> unlockBatchRequest(new UnlockBatchRequestBody());
    vector<MQMessageQueue> mqs(*(itb->second));
    unlockBatchRequest->setClientId(m_pConsumer->getMQClientId());
    unlockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
    unlockBatchRequest->setMqSet(mqs);

    try {
      m_pClientFactory->getMQClientAPIImpl()->unlockBatchMQ(pFindBrokerResult->brokerAddr, unlockBatchRequest.get(),
                                                            1000, m_pConsumer->getSessionCredentials());
      for (unsigned int i = 0; i != mqs.size(); ++i) {
        boost::weak_ptr<PullRequest> pullreq = getPullRequest(mqs[i]);
        if (!pullreq.expired()) {
          LOG_INFO("unlockBatchMQ success of mq:%s", mqs[i].toString().c_str());
          pullreq.lock()->setLocked(false);
        } else {
          LOG_ERROR("unlockBatchMQ fails of mq:%s", mqs[i].toString().c_str());
        }
      }
    } catch (MQException& e) {
      LOG_ERROR("unlockBatchMQ fails");
    }
    deleteAndZero(itb->second);
  }
  brokerMqs.clear();
}