in src/MQClientFactory.cpp [163:225]
bool MQClientFactory::updateTopicRouteInfoFromNameServer(const string& topic,
const SessionCredentials& session_credentials,
bool isDefault /* = false */) {
boost::lock_guard<boost::mutex> lock(m_factoryLock);
unique_ptr<TopicRouteData> pTopicRouteData;
LOG_DEBUG("updateTopicRouteInfoFromNameServer start. Topic:%s", topic.c_str());
if (isDefault) {
pTopicRouteData.reset(
m_pClientAPIImpl->getTopicRouteInfoFromNameServer(DEFAULT_TOPIC, 1000 * 5, session_credentials));
if (pTopicRouteData != NULL) {
vector<QueueData>& queueDatas = pTopicRouteData->getQueueDatas();
vector<QueueData>::iterator it = queueDatas.begin();
for (; it != queueDatas.end(); ++it) {
int queueNums = std::min(4, it->readQueueNums);
it->readQueueNums = queueNums;
it->writeQueueNums = queueNums;
}
}
LOG_DEBUG("getTopicRouteInfoFromNameServer is null for topic :%s", topic.c_str());
} else {
pTopicRouteData.reset(m_pClientAPIImpl->getTopicRouteInfoFromNameServer(topic, 1000 * 5, session_credentials));
}
if (pTopicRouteData != NULL) {
LOG_DEBUG("updateTopicRouteInfoFromNameServer has data");
TopicRouteData* pTemp = getTopicRouteData(topic);
bool changed = true;
if (pTemp != NULL) {
changed = !(*pTemp == *pTopicRouteData);
}
if (getConsumerTableSize() > 0) {
vector<MQMessageQueue> mqs;
topicRouteData2TopicSubscribeInfo(topic, pTopicRouteData.get(), mqs);
updateConsumerSubscribeTopicInfo(topic, mqs);
}
if (changed) {
//<!update Broker addr
LOG_INFO("updateTopicRouteInfoFromNameServer changed:%s", topic.c_str());
vector<BrokerData> brokerList = pTopicRouteData->getBrokerDatas();
vector<BrokerData>::iterator it = brokerList.begin();
for (; it != brokerList.end(); ++it) {
LOG_INFO("updateTopicRouteInfoFromNameServer changed with broker name:%s", (*it).brokerName.c_str());
addBrokerToAddrMap((*it).brokerName, (*it).brokerAddrs);
}
//<! update publish info;
{
boost::shared_ptr<TopicPublishInfo> publishInfo(topicRouteData2TopicPublishInfo(topic, pTopicRouteData.get()));
addTopicInfoToTable(topic, publishInfo); // erase first, then add
}
//<! update subscribe info
addTopicRouteData(topic, pTopicRouteData.release());
}
LOG_DEBUG("updateTopicRouteInfoFromNameServer end:%s", topic.c_str());
return true;
}
LOG_DEBUG("updateTopicRouteInfoFromNameServer end:%s", topic.c_str());
return false;
}