in src/consumer/Rebalance.cpp [347:397]
void Rebalance::lockAll() {
map<string, vector<MQMessageQueue>*> brokerMqs;
MQ2PULLREQ requestQueueTable = getPullRequestTable();
for (MQ2PULLREQ::iterator it = requestQueueTable.begin(); it != requestQueueTable.end(); ++it) {
if (!(it->second->isDropped())) {
string brokerKey = it->first.getBrokerName() + it->first.getTopic();
if (brokerMqs.find(brokerKey) == brokerMqs.end()) {
vector<MQMessageQueue>* mqs = new vector<MQMessageQueue>;
brokerMqs[brokerKey] = mqs;
brokerMqs[brokerKey]->push_back(it->first);
} else {
brokerMqs[brokerKey]->push_back(it->first);
}
}
}
LOG_INFO("LockAll " SIZET_FMT " broker mqs", brokerMqs.size());
for (map<string, vector<MQMessageQueue>*>::iterator itb = brokerMqs.begin(); itb != brokerMqs.end(); ++itb) {
string brokerName = (*(itb->second))[0].getBrokerName();
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_pClientFactory->findBrokerAddressInSubscribe(brokerName, MASTER_ID, true));
if (!pFindBrokerResult) {
LOG_ERROR("lockAll findBrokerAddressInSubscribe ret null for broker:%s", brokerName.data());
continue;
}
unique_ptr<LockBatchRequestBody> lockBatchRequest(new LockBatchRequestBody());
lockBatchRequest->setClientId(m_pConsumer->getMQClientId());
lockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
lockBatchRequest->setMqSet(*(itb->second));
LOG_INFO("try to lock:" SIZET_FMT " mqs of broker:%s", itb->second->size(), itb->first.c_str());
try {
vector<MQMessageQueue> messageQueues;
m_pClientFactory->getMQClientAPIImpl()->lockBatchMQ(pFindBrokerResult->brokerAddr, lockBatchRequest.get(),
messageQueues, 1000, m_pConsumer->getSessionCredentials());
for (unsigned int i = 0; i != messageQueues.size(); ++i) {
boost::weak_ptr<PullRequest> pullreq = getPullRequest(messageQueues[i]);
if (!pullreq.expired()) {
LOG_INFO("lockBatchMQ success of mq:%s", messageQueues[i].toString().c_str());
pullreq.lock()->setLocked(true);
pullreq.lock()->setLastLockTimestamp(UtilAll::currentTimeMillis());
} else {
LOG_ERROR("lockBatchMQ fails of mq:%s", messageQueues[i].toString().c_str());
}
}
messageQueues.clear();
} catch (MQException& e) {
LOG_ERROR("lockBatchMQ fails");
}
deleteAndZero(itb->second);
}
brokerMqs.clear();
}