in namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java [700:801]
public TopicRouteData pickupTopicRouteData(final String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
boolean foundQueueData = false;
boolean foundBrokerData = false;
List<BrokerData> brokerDataList = new LinkedList<>();
topicRouteData.setBrokerDatas(brokerDataList);
HashMap<String, List<String>> filterServerMap = new HashMap<>();
topicRouteData.setFilterServerTable(filterServerMap);
try {
this.lock.readLock().lockInterruptibly();
Map<String, QueueData> queueDataMap = this.topicQueueTable.get(topic);
if (queueDataMap != null) {
topicRouteData.setQueueDatas(new ArrayList<>(queueDataMap.values()));
foundQueueData = true;
Set<String> brokerNameSet = new HashSet<>(queueDataMap.keySet());
for (String brokerName : brokerNameSet) {
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
continue;
}
BrokerData brokerDataClone = new BrokerData(brokerData);
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
if (filterServerTable.isEmpty()) {
continue;
}
for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(brokerDataClone.getCluster(), brokerAddr);
List<String> filterServerList = this.filterServerTable.get(brokerAddrInfo);
filterServerMap.put(brokerAddr, filterServerList);
}
}
}
} catch (Exception e) {
log.error("pickupTopicRouteData Exception", e);
} finally {
this.lock.readLock().unlock();
}
log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
if (foundBrokerData && foundQueueData) {
topicRouteData.setTopicQueueMappingByBroker(this.topicQueueMappingInfoTable.get(topic));
if (!namesrvConfig.isSupportActingMaster()) {
return topicRouteData;
}
if (topic.startsWith(TopicValidator.SYNC_BROKER_MEMBER_GROUP_PREFIX)) {
return topicRouteData;
}
if (topicRouteData.getBrokerDatas().size() == 0 || topicRouteData.getQueueDatas().size() == 0) {
return topicRouteData;
}
boolean needActingMaster = false;
for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) {
if (brokerData.getBrokerAddrs().size() != 0
&& !brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
needActingMaster = true;
break;
}
}
if (!needActingMaster) {
return topicRouteData;
}
for (final BrokerData brokerData : topicRouteData.getBrokerDatas()) {
final HashMap<Long, String> brokerAddrs = brokerData.getBrokerAddrs();
if (brokerAddrs.size() == 0 || brokerAddrs.containsKey(MixAll.MASTER_ID) || !brokerData.isEnableActingMaster()) {
continue;
}
// No master
for (final QueueData queueData : topicRouteData.getQueueDatas()) {
if (queueData.getBrokerName().equals(brokerData.getBrokerName())) {
if (!PermName.isWriteable(queueData.getPerm())) {
final Long minBrokerId = Collections.min(brokerAddrs.keySet());
final String actingMasterAddr = brokerAddrs.remove(minBrokerId);
brokerAddrs.put(MixAll.MASTER_ID, actingMasterAddr);
}
break;
}
}
}
return topicRouteData;
}
return null;
}