in broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java [103:210]
public void cleanItemExpired() {
String when = messageStoreConfig.getDeleteWhen();
if (!UtilAll.isItTimeToDo(when)) {
return;
}
boolean changed = false;
long start = System.currentTimeMillis();
try {
for (String topic : this.topicQueueMappingManager.getTopicQueueMappingTable().keySet()) {
try {
if (isStopped()) {
break;
}
TopicQueueMappingDetail mappingDetail = this.topicQueueMappingManager.getTopicQueueMappingTable().get(topic);
if (mappingDetail == null
|| mappingDetail.getHostedQueues().isEmpty()) {
continue;
}
if (!mappingDetail.getBname().equals(brokerConfig.getBrokerName())) {
log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
continue;
}
Set<String> brokers = new HashSet<>();
for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
if (items.size() <= 1) {
continue;
}
if (!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
continue;
}
LogicQueueMappingItem earlistItem = items.get(0);
brokers.add(earlistItem.getBname());
}
Map<String, TopicStatsTable> statsTable = new HashMap<>();
for (String broker: brokers) {
GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
header.setTopic(topic);
header.setBrokerName(broker);
header.setLo(false);
try {
RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
}
statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
} catch (Throwable rt) {
log.error("Get remote topic {} state info failed from broker {}", topic, broker, rt);
}
}
Map<Integer, List<LogicQueueMappingItem>> newHostedQueues = new HashMap<>();
boolean changedForTopic = false;
for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
Integer qid = entry.getKey();
List<LogicQueueMappingItem> items = entry.getValue();
if (items.size() <= 1) {
continue;
}
if (!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
continue;
}
LogicQueueMappingItem earlistItem = items.get(0);
TopicStatsTable topicStats = statsTable.get(earlistItem.getBname());
if (topicStats == null) {
continue;
}
TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
if (topicOffset == null) {
//this may should not happen
log.error("Get null topicOffset for {} {}",topic, earlistItem);
continue;
}
//ignore the maxOffset < 0, which may in case of some error
if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()
|| topicOffset.getMaxOffset() == 0) {
List<LogicQueueMappingItem> newItems = new ArrayList<>(items);
boolean result = newItems.remove(earlistItem);
if (result) {
changedForTopic = true;
newHostedQueues.put(qid, newItems);
}
log.info("The logic queue item {} {} is removed {} because of {}", topic, earlistItem, result, topicOffset);
}
}
if (changedForTopic) {
TopicQueueMappingDetail newMappingDetail = new TopicQueueMappingDetail(mappingDetail.getTopic(), mappingDetail.getTotalQueues(), mappingDetail.getBname(), mappingDetail.getEpoch());
newMappingDetail.getHostedQueues().putAll(mappingDetail.getHostedQueues());
newMappingDetail.getHostedQueues().putAll(newHostedQueues);
this.topicQueueMappingManager.updateTopicQueueMapping(newMappingDetail, false, true, false);
changed = true;
}
} catch (Throwable tt) {
log.error("Try CleanItemExpired failed for {}", topic, tt);
} finally {
UtilAll.sleep(10);
}
}
} catch (Throwable t) {
log.error("Try cleanItemExpired failed", t);
} finally {
if (changed) {
this.topicQueueMappingManager.getDataVersion().nextVersion();
this.topicQueueMappingManager.persist();
log.info("CleanItemExpired changed");
}
log.info("cleanItemExpired cost {} ms", System.currentTimeMillis() - start);
}
}