void Rebalance::lockAll()

in src/consumer/Rebalance.cpp [347:397]


void Rebalance::lockAll() {
  map<string, vector<MQMessageQueue>*> brokerMqs;
  MQ2PULLREQ requestQueueTable = getPullRequestTable();
  for (MQ2PULLREQ::iterator it = requestQueueTable.begin(); it != requestQueueTable.end(); ++it) {
    if (!(it->second->isDropped())) {
      string brokerKey = it->first.getBrokerName() + it->first.getTopic();
      if (brokerMqs.find(brokerKey) == brokerMqs.end()) {
        vector<MQMessageQueue>* mqs = new vector<MQMessageQueue>;
        brokerMqs[brokerKey] = mqs;
        brokerMqs[brokerKey]->push_back(it->first);
      } else {
        brokerMqs[brokerKey]->push_back(it->first);
      }
    }
  }
  LOG_INFO("LockAll " SIZET_FMT " broker mqs", brokerMqs.size());
  for (map<string, vector<MQMessageQueue>*>::iterator itb = brokerMqs.begin(); itb != brokerMqs.end(); ++itb) {
    string brokerName = (*(itb->second))[0].getBrokerName();
    unique_ptr<FindBrokerResult> pFindBrokerResult(
        m_pClientFactory->findBrokerAddressInSubscribe(brokerName, MASTER_ID, true));
    if (!pFindBrokerResult) {
      LOG_ERROR("lockAll findBrokerAddressInSubscribe ret null for broker:%s", brokerName.data());
      continue;
    }
    unique_ptr<LockBatchRequestBody> lockBatchRequest(new LockBatchRequestBody());
    lockBatchRequest->setClientId(m_pConsumer->getMQClientId());
    lockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
    lockBatchRequest->setMqSet(*(itb->second));
    LOG_INFO("try to lock:" SIZET_FMT " mqs of broker:%s", itb->second->size(), itb->first.c_str());
    try {
      vector<MQMessageQueue> messageQueues;
      m_pClientFactory->getMQClientAPIImpl()->lockBatchMQ(pFindBrokerResult->brokerAddr, lockBatchRequest.get(),
                                                          messageQueues, 1000, m_pConsumer->getSessionCredentials());
      for (unsigned int i = 0; i != messageQueues.size(); ++i) {
        boost::weak_ptr<PullRequest> pullreq = getPullRequest(messageQueues[i]);
        if (!pullreq.expired()) {
          LOG_INFO("lockBatchMQ success of mq:%s", messageQueues[i].toString().c_str());
          pullreq.lock()->setLocked(true);
          pullreq.lock()->setLastLockTimestamp(UtilAll::currentTimeMillis());
        } else {
          LOG_ERROR("lockBatchMQ fails of mq:%s", messageQueues[i].toString().c_str());
        }
      }
      messageQueues.clear();
    } catch (MQException& e) {
      LOG_ERROR("lockBatchMQ fails");
    }
    deleteAndZero(itb->second);
  }
  brokerMqs.clear();
}