in src/consumer/Rebalance.cpp [399:440]
bool Rebalance::lock(MQMessageQueue mq) {
unique_ptr<FindBrokerResult> pFindBrokerResult(
m_pClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), MASTER_ID, true));
if (!pFindBrokerResult) {
LOG_ERROR("lock findBrokerAddressInSubscribe ret null for broker:%s", mq.getBrokerName().data());
return false;
}
unique_ptr<LockBatchRequestBody> lockBatchRequest(new LockBatchRequestBody());
lockBatchRequest->setClientId(m_pConsumer->getMQClientId());
lockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
vector<MQMessageQueue> in_mqSet;
in_mqSet.push_back(mq);
lockBatchRequest->setMqSet(in_mqSet);
bool lockResult = false;
try {
vector<MQMessageQueue> messageQueues;
LOG_DEBUG("try to lock mq:%s", mq.toString().c_str());
m_pClientFactory->getMQClientAPIImpl()->lockBatchMQ(pFindBrokerResult->brokerAddr, lockBatchRequest.get(),
messageQueues, 1000, m_pConsumer->getSessionCredentials());
if (messageQueues.size() == 0) {
LOG_ERROR("lock mq on broker:%s failed", pFindBrokerResult->brokerAddr.c_str());
return false;
}
for (unsigned int i = 0; i != messageQueues.size(); ++i) {
boost::weak_ptr<PullRequest> pullreq = getPullRequest(messageQueues[i]);
if (!pullreq.expired()) {
LOG_INFO("lock success of mq:%s", messageQueues[i].toString().c_str());
pullreq.lock()->setLocked(true);
pullreq.lock()->setLastLockTimestamp(UtilAll::currentTimeMillis());
lockResult = true;
} else {
LOG_ERROR("lock fails of mq:%s", messageQueues[i].toString().c_str());
}
}
messageQueues.clear();
return lockResult;
} catch (MQException& e) {
LOG_ERROR("lock fails of mq:%s", mq.toString().c_str());
return false;
}
}