in broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java [212:331]
public void cleanItemListMoreThanSecondGen() {
String when = messageStoreConfig.getDeleteWhen();
if (!UtilAll.isItTimeToDo(when)) {
return;
}
boolean changed = false;
long start = System.currentTimeMillis();
try {
ClientMetadata clientMetadata = new ClientMetadata();
for (String topic : this.topicQueueMappingManager.getTopicQueueMappingTable().keySet()) {
try {
if (isStopped()) {
break;
}
TopicQueueMappingDetail mappingDetail = this.topicQueueMappingManager.getTopicQueueMappingTable().get(topic);
if (mappingDetail == null
|| mappingDetail.getHostedQueues().isEmpty()) {
continue;
}
if (!mappingDetail.getBname().equals(brokerConfig.getBrokerName())) {
log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
continue;
}
Map<Integer, String> qid2CurrLeaderBroker = new HashMap<>();
for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
Integer qId = entry.getKey();
List<LogicQueueMappingItem> items = entry.getValue();
if (items.isEmpty()) {
continue;
}
LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
if (!leaderItem.getBname().equals(mappingDetail.getBname())) {
qid2CurrLeaderBroker.put(qId, leaderItem.getBname());
}
}
if (qid2CurrLeaderBroker.isEmpty()) {
continue;
}
//find the topic route
TopicRouteData topicRouteData = brokerOuterAPI.getTopicRouteInfoFromNameServer(topic, brokerConfig.getForwardTimeout());
clientMetadata.freshTopicRoute(topic, topicRouteData);
Map<Integer, String> qid2RealLeaderBroker = new HashMap<>();
//fine the real leader
for (Map.Entry<Integer, String> entry : qid2CurrLeaderBroker.entrySet()) {
qid2RealLeaderBroker.put(entry.getKey(), clientMetadata.getBrokerNameFromMessageQueue(new MessageQueue(topic, TopicQueueMappingUtils.getMockBrokerName(mappingDetail.getScope()), entry.getKey())));
}
//find the mapping detail of real leader
Map<String, TopicQueueMappingDetail> mappingDetailMap = new HashMap<>();
for (Map.Entry<Integer, String> entry : qid2RealLeaderBroker.entrySet()) {
if (entry.getValue().startsWith(MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX)) {
continue;
}
String broker = entry.getValue();
GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
header.setTopic(topic);
header.setBrokerName(broker);
header.setLo(true);
try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null);
RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
TopicQueueMappingDetail mappingDetailRemote = ((TopicConfigAndQueueMapping) rpcResponse.getBody()).getMappingDetail();
if (broker.equals(mappingDetailRemote.getBname())) {
mappingDetailMap.put(broker, mappingDetailRemote);
}
} catch (Throwable rt) {
log.error("Get remote topic {} state info failed from broker {}", topic, broker, rt);
}
}
//check all the info
Set<Integer> ids2delete = new HashSet<>();
for (Map.Entry<Integer, String> entry : qid2CurrLeaderBroker.entrySet()) {
Integer qId = entry.getKey();
String currLeaderBroker = entry.getValue();
String realLeaderBroker = qid2RealLeaderBroker.get(qId);
TopicQueueMappingDetail remoteMappingDetail = mappingDetailMap.get(realLeaderBroker);
if (remoteMappingDetail == null
|| remoteMappingDetail.getTotalQueues() != mappingDetail.getTotalQueues()
|| remoteMappingDetail.getEpoch() != mappingDetail.getEpoch()) {
continue;
}
List<LogicQueueMappingItem> items = remoteMappingDetail.getHostedQueues().get(qId);
if (items.isEmpty()) {
continue;
}
LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
if (!realLeaderBroker.equals(leaderItem.getBname())) {
continue;
}
//all the check is ok
if (!realLeaderBroker.equals(currLeaderBroker)) {
ids2delete.add(qId);
}
}
for (Integer qid : ids2delete) {
List<LogicQueueMappingItem> items = mappingDetail.getHostedQueues().remove(qid);
changed = true;
if (items != null) {
log.info("Remove the ItemListMoreThanSecondGen topic {} qid {} items {}", topic, qid, items);
}
}
} catch (Throwable tt) {
log.error("Try cleanItemListMoreThanSecondGen failed for topic {}", topic, tt);
} finally {
UtilAll.sleep(10);
}
}
} catch (Throwable t) {
log.error("Try cleanItemListMoreThanSecondGen failed", t);
} finally {
if (changed) {
this.topicQueueMappingManager.getDataVersion().nextVersion();
this.topicQueueMappingManager.persist();
}
log.info("Try cleanItemListMoreThanSecondGen cost {} ms", System.currentTimeMillis() - start);
}
}