bool MQClientFactory::updateTopicRouteInfoFromNameServer()

in src/MQClientFactory.cpp [163:225]


bool MQClientFactory::updateTopicRouteInfoFromNameServer(const string& topic,
                                                         const SessionCredentials& session_credentials,
                                                         bool isDefault /* = false */) {
  boost::lock_guard<boost::mutex> lock(m_factoryLock);
  unique_ptr<TopicRouteData> pTopicRouteData;
  LOG_DEBUG("updateTopicRouteInfoFromNameServer start. Topic:%s", topic.c_str());

  if (isDefault) {
    pTopicRouteData.reset(
        m_pClientAPIImpl->getTopicRouteInfoFromNameServer(DEFAULT_TOPIC, 1000 * 5, session_credentials));
    if (pTopicRouteData != NULL) {
      vector<QueueData>& queueDatas = pTopicRouteData->getQueueDatas();
      vector<QueueData>::iterator it = queueDatas.begin();
      for (; it != queueDatas.end(); ++it) {
        int queueNums = std::min(4, it->readQueueNums);
        it->readQueueNums = queueNums;
        it->writeQueueNums = queueNums;
      }
    }
    LOG_DEBUG("getTopicRouteInfoFromNameServer is null for topic :%s", topic.c_str());
  } else {
    pTopicRouteData.reset(m_pClientAPIImpl->getTopicRouteInfoFromNameServer(topic, 1000 * 5, session_credentials));
  }

  if (pTopicRouteData != NULL) {
    LOG_DEBUG("updateTopicRouteInfoFromNameServer has data");
    TopicRouteData* pTemp = getTopicRouteData(topic);
    bool changed = true;
    if (pTemp != NULL) {
      changed = !(*pTemp == *pTopicRouteData);
    }

    if (getConsumerTableSize() > 0) {
      vector<MQMessageQueue> mqs;
      topicRouteData2TopicSubscribeInfo(topic, pTopicRouteData.get(), mqs);
      updateConsumerSubscribeTopicInfo(topic, mqs);
    }

    if (changed) {
      //<!update Broker addr
      LOG_INFO("updateTopicRouteInfoFromNameServer changed:%s", topic.c_str());
      vector<BrokerData> brokerList = pTopicRouteData->getBrokerDatas();
      vector<BrokerData>::iterator it = brokerList.begin();
      for (; it != brokerList.end(); ++it) {
        LOG_INFO("updateTopicRouteInfoFromNameServer changed with broker name:%s", (*it).brokerName.c_str());
        addBrokerToAddrMap((*it).brokerName, (*it).brokerAddrs);
      }

      //<! update publish info;
      {
        boost::shared_ptr<TopicPublishInfo> publishInfo(topicRouteData2TopicPublishInfo(topic, pTopicRouteData.get()));
        addTopicInfoToTable(topic, publishInfo);  // erase first, then add
      }

      //<! update subscribe info
      addTopicRouteData(topic, pTopicRouteData.release());
    }
    LOG_DEBUG("updateTopicRouteInfoFromNameServer end:%s", topic.c_str());
    return true;
  }
  LOG_DEBUG("updateTopicRouteInfoFromNameServer end:%s", topic.c_str());
  return false;
}