in src/consumer/Rebalance.cpp [269:314]
void Rebalance::unlockAll(bool oneWay) {
map<string, vector<MQMessageQueue>*> brokerMqs;
MQ2PULLREQ requestQueueTable = getPullRequestTable();
for (MQ2PULLREQ::iterator it = requestQueueTable.begin(); it != requestQueueTable.end(); ++it) {
if (!(it->second->isDropped())) {
if (brokerMqs.find(it->first.getBrokerName()) == brokerMqs.end()) {
vector<MQMessageQueue>* mqs = new vector<MQMessageQueue>;
brokerMqs[it->first.getBrokerName()] = mqs;
} else {
brokerMqs[it->first.getBrokerName()]->push_back(it->first);
}
}
}
LOG_INFO("unLockAll " SIZET_FMT " broker mqs", brokerMqs.size());
for (map<string, vector<MQMessageQueue>*>::iterator itb = brokerMqs.begin(); itb != brokerMqs.end(); ++itb) {
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_pClientFactory->findBrokerAddressInSubscribe(itb->first, MASTER_ID, true));
if (!pFindBrokerResult) {
LOG_ERROR("unlockAll findBrokerAddressInSubscribe ret null for broker:%s", itb->first.data());
continue;
}
unique_ptr<UnlockBatchRequestBody> unlockBatchRequest(new UnlockBatchRequestBody());
vector<MQMessageQueue> mqs(*(itb->second));
unlockBatchRequest->setClientId(m_pConsumer->getMQClientId());
unlockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
unlockBatchRequest->setMqSet(mqs);
try {
m_pClientFactory->getMQClientAPIImpl()->unlockBatchMQ(pFindBrokerResult->brokerAddr, unlockBatchRequest.get(),
1000, m_pConsumer->getSessionCredentials());
for (unsigned int i = 0; i != mqs.size(); ++i) {
boost::weak_ptr<PullRequest> pullreq = getPullRequest(mqs[i]);
if (!pullreq.expired()) {
LOG_INFO("unlockBatchMQ success of mq:%s", mqs[i].toString().c_str());
pullreq.lock()->setLocked(false);
} else {
LOG_ERROR("unlockBatchMQ fails of mq:%s", mqs[i].toString().c_str());
}
}
} catch (MQException& e) {
LOG_ERROR("unlockBatchMQ fails");
}
deleteAndZero(itb->second);
}
brokerMqs.clear();
}