public void cleanItemExpired()

in broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java [103:210]


    public void cleanItemExpired() {
        String when = messageStoreConfig.getDeleteWhen();
        if (!UtilAll.isItTimeToDo(when)) {
            return;
        }
        boolean changed = false;
        long start = System.currentTimeMillis();
        try {
            for (String topic : this.topicQueueMappingManager.getTopicQueueMappingTable().keySet()) {
                try {
                    if (isStopped()) {
                        break;
                    }
                    TopicQueueMappingDetail mappingDetail = this.topicQueueMappingManager.getTopicQueueMappingTable().get(topic);
                    if (mappingDetail == null
                            || mappingDetail.getHostedQueues().isEmpty()) {
                        continue;
                    }
                    if (!mappingDetail.getBname().equals(brokerConfig.getBrokerName())) {
                        log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
                        continue;
                    }
                    Set<String> brokers = new HashSet<>();
                    for (List<LogicQueueMappingItem> items: mappingDetail.getHostedQueues().values()) {
                        if (items.size() <= 1) {
                            continue;
                        }
                        if (!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
                            continue;
                        }
                        LogicQueueMappingItem earlistItem = items.get(0);
                        brokers.add(earlistItem.getBname());
                    }
                    Map<String, TopicStatsTable> statsTable = new HashMap<>();
                    for (String broker: brokers) {
                        GetTopicStatsInfoRequestHeader header = new GetTopicStatsInfoRequestHeader();
                        header.setTopic(topic);
                        header.setBrokerName(broker);
                        header.setLo(false);
                        try {
                            RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_STATS_INFO, header, null);
                            RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
                            if (rpcResponse.getException() != null) {
                                throw rpcResponse.getException();
                            }
                            statsTable.put(broker, (TopicStatsTable) rpcResponse.getBody());
                        } catch (Throwable rt) {
                            log.error("Get remote topic {} state info failed from broker {}", topic, broker, rt);
                        }
                    }
                    Map<Integer, List<LogicQueueMappingItem>> newHostedQueues = new HashMap<>();
                    boolean changedForTopic = false;
                    for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
                        Integer qid = entry.getKey();
                        List<LogicQueueMappingItem> items = entry.getValue();
                        if (items.size() <= 1) {
                            continue;
                        }
                        if (!TopicQueueMappingUtils.checkIfLeader(items, mappingDetail)) {
                            continue;
                        }
                        LogicQueueMappingItem earlistItem = items.get(0);
                        TopicStatsTable topicStats = statsTable.get(earlistItem.getBname());
                        if (topicStats == null) {
                            continue;
                        }
                        TopicOffset topicOffset = topicStats.getOffsetTable().get(new MessageQueue(topic, earlistItem.getBname(), earlistItem.getQueueId()));
                        if (topicOffset == null) {
                            //this may should not happen
                            log.error("Get null topicOffset for {} {}",topic,  earlistItem);
                            continue;
                        }
                        //ignore the maxOffset < 0, which may in case of some error
                        if (topicOffset.getMaxOffset() == topicOffset.getMinOffset()
                            || topicOffset.getMaxOffset() == 0) {
                            List<LogicQueueMappingItem> newItems = new ArrayList<>(items);
                            boolean result = newItems.remove(earlistItem);
                            if (result) {
                                changedForTopic = true;
                                newHostedQueues.put(qid, newItems);
                            }
                            log.info("The logic queue item {} {} is removed {} because of {}", topic, earlistItem, result, topicOffset);
                        }
                    }
                    if (changedForTopic) {
                        TopicQueueMappingDetail newMappingDetail = new TopicQueueMappingDetail(mappingDetail.getTopic(), mappingDetail.getTotalQueues(), mappingDetail.getBname(), mappingDetail.getEpoch());
                        newMappingDetail.getHostedQueues().putAll(mappingDetail.getHostedQueues());
                        newMappingDetail.getHostedQueues().putAll(newHostedQueues);
                        this.topicQueueMappingManager.updateTopicQueueMapping(newMappingDetail, false, true, false);
                        changed = true;
                    }
                } catch (Throwable tt) {
                    log.error("Try CleanItemExpired failed for {}", topic, tt);
                } finally {
                    UtilAll.sleep(10);
                }
            }
        } catch (Throwable t) {
            log.error("Try cleanItemExpired failed", t);
        } finally {
            if (changed) {
                this.topicQueueMappingManager.getDataVersion().nextVersion();
                this.topicQueueMappingManager.persist();
                log.info("CleanItemExpired changed");
            }
            log.info("cleanItemExpired cost {} ms", System.currentTimeMillis() - start);
        }
    }