bool Rebalance::lock()

in src/consumer/Rebalance.cpp [399:440]


bool Rebalance::lock(MQMessageQueue mq) {
  unique_ptr<FindBrokerResult> pFindBrokerResult(
      m_pClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), MASTER_ID, true));
  if (!pFindBrokerResult) {
    LOG_ERROR("lock findBrokerAddressInSubscribe ret null for broker:%s", mq.getBrokerName().data());
    return false;
  }
  unique_ptr<LockBatchRequestBody> lockBatchRequest(new LockBatchRequestBody());
  lockBatchRequest->setClientId(m_pConsumer->getMQClientId());
  lockBatchRequest->setConsumerGroup(m_pConsumer->getGroupName());
  vector<MQMessageQueue> in_mqSet;
  in_mqSet.push_back(mq);
  lockBatchRequest->setMqSet(in_mqSet);
  bool lockResult = false;

  try {
    vector<MQMessageQueue> messageQueues;
    LOG_DEBUG("try to lock mq:%s", mq.toString().c_str());
    m_pClientFactory->getMQClientAPIImpl()->lockBatchMQ(pFindBrokerResult->brokerAddr, lockBatchRequest.get(),
                                                        messageQueues, 1000, m_pConsumer->getSessionCredentials());
    if (messageQueues.size() == 0) {
      LOG_ERROR("lock mq on broker:%s failed", pFindBrokerResult->brokerAddr.c_str());
      return false;
    }
    for (unsigned int i = 0; i != messageQueues.size(); ++i) {
      boost::weak_ptr<PullRequest> pullreq = getPullRequest(messageQueues[i]);
      if (!pullreq.expired()) {
        LOG_INFO("lock success of mq:%s", messageQueues[i].toString().c_str());
        pullreq.lock()->setLocked(true);
        pullreq.lock()->setLastLockTimestamp(UtilAll::currentTimeMillis());
        lockResult = true;
      } else {
        LOG_ERROR("lock fails of mq:%s", messageQueues[i].toString().c_str());
      }
    }
    messageQueues.clear();
    return lockResult;
  } catch (MQException& e) {
    LOG_ERROR("lock fails of mq:%s", mq.toString().c_str());
    return false;
  }
}