public void cleanItemListMoreThanSecondGen()

in broker/src/main/java/org/apache/rocketmq/broker/topic/TopicQueueMappingCleanService.java [212:331]


    public void cleanItemListMoreThanSecondGen() {
        String when = messageStoreConfig.getDeleteWhen();
        if (!UtilAll.isItTimeToDo(when)) {
            return;
        }
        boolean changed = false;
        long start = System.currentTimeMillis();
        try {
            ClientMetadata clientMetadata = new ClientMetadata();
            for (String topic : this.topicQueueMappingManager.getTopicQueueMappingTable().keySet()) {
                try {
                    if (isStopped()) {
                        break;
                    }
                    TopicQueueMappingDetail mappingDetail = this.topicQueueMappingManager.getTopicQueueMappingTable().get(topic);
                    if (mappingDetail == null
                            || mappingDetail.getHostedQueues().isEmpty()) {
                        continue;
                    }
                    if (!mappingDetail.getBname().equals(brokerConfig.getBrokerName())) {
                        log.warn("The TopicQueueMappingDetail [{}] should not exist in this broker", mappingDetail);
                        continue;
                    }
                    Map<Integer, String> qid2CurrLeaderBroker = new HashMap<>();
                    for (Map.Entry<Integer, List<LogicQueueMappingItem>> entry : mappingDetail.getHostedQueues().entrySet()) {
                        Integer qId = entry.getKey();
                        List<LogicQueueMappingItem> items = entry.getValue();
                        if (items.isEmpty()) {
                            continue;
                        }
                        LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
                        if (!leaderItem.getBname().equals(mappingDetail.getBname())) {
                            qid2CurrLeaderBroker.put(qId, leaderItem.getBname());
                        }
                    }
                    if (qid2CurrLeaderBroker.isEmpty()) {
                        continue;
                    }
                    //find the topic route
                    TopicRouteData topicRouteData = brokerOuterAPI.getTopicRouteInfoFromNameServer(topic, brokerConfig.getForwardTimeout());
                    clientMetadata.freshTopicRoute(topic, topicRouteData);
                    Map<Integer, String> qid2RealLeaderBroker = new HashMap<>();
                    //fine the real leader
                    for (Map.Entry<Integer, String> entry : qid2CurrLeaderBroker.entrySet()) {
                        qid2RealLeaderBroker.put(entry.getKey(), clientMetadata.getBrokerNameFromMessageQueue(new MessageQueue(topic, TopicQueueMappingUtils.getMockBrokerName(mappingDetail.getScope()), entry.getKey())));
                    }

                    //find the mapping detail of real leader
                    Map<String, TopicQueueMappingDetail> mappingDetailMap = new HashMap<>();
                    for (Map.Entry<Integer, String> entry : qid2RealLeaderBroker.entrySet()) {
                        if (entry.getValue().startsWith(MixAll.LOGICAL_QUEUE_MOCK_BROKER_PREFIX)) {
                            continue;
                        }
                        String broker = entry.getValue();
                        GetTopicConfigRequestHeader header = new GetTopicConfigRequestHeader();
                        header.setTopic(topic);
                        header.setBrokerName(broker);
                        header.setLo(true);
                        try {
                            RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_TOPIC_CONFIG, header, null);
                            RpcResponse rpcResponse = rpcClient.invoke(rpcRequest, brokerConfig.getForwardTimeout()).get();
                            if (rpcResponse.getException() != null) {
                                throw rpcResponse.getException();
                            }
                            TopicQueueMappingDetail mappingDetailRemote = ((TopicConfigAndQueueMapping) rpcResponse.getBody()).getMappingDetail();
                            if (broker.equals(mappingDetailRemote.getBname())) {
                                mappingDetailMap.put(broker, mappingDetailRemote);
                            }
                        } catch (Throwable rt) {
                            log.error("Get remote topic {} state info failed from broker {}", topic, broker, rt);
                        }
                    }
                    //check all the info
                    Set<Integer> ids2delete = new HashSet<>();
                    for (Map.Entry<Integer, String> entry : qid2CurrLeaderBroker.entrySet()) {
                        Integer qId = entry.getKey();
                        String currLeaderBroker = entry.getValue();
                        String realLeaderBroker = qid2RealLeaderBroker.get(qId);
                        TopicQueueMappingDetail remoteMappingDetail = mappingDetailMap.get(realLeaderBroker);
                        if (remoteMappingDetail == null
                                || remoteMappingDetail.getTotalQueues() != mappingDetail.getTotalQueues()
                                || remoteMappingDetail.getEpoch() != mappingDetail.getEpoch()) {
                            continue;
                        }
                        List<LogicQueueMappingItem> items = remoteMappingDetail.getHostedQueues().get(qId);
                        if (items.isEmpty()) {
                            continue;
                        }
                        LogicQueueMappingItem leaderItem = items.get(items.size() - 1);
                        if (!realLeaderBroker.equals(leaderItem.getBname())) {
                            continue;
                        }
                        //all the check is ok
                        if (!realLeaderBroker.equals(currLeaderBroker)) {
                            ids2delete.add(qId);
                        }
                    }
                    for (Integer qid : ids2delete) {
                        List<LogicQueueMappingItem> items = mappingDetail.getHostedQueues().remove(qid);
                        changed =  true;
                        if (items != null) {
                            log.info("Remove the ItemListMoreThanSecondGen topic {} qid {} items {}", topic, qid, items);
                        }
                    }
                } catch (Throwable tt) {
                    log.error("Try cleanItemListMoreThanSecondGen failed for topic {}", topic, tt);
                } finally {
                    UtilAll.sleep(10);
                }
            }
        } catch (Throwable t) {
            log.error("Try cleanItemListMoreThanSecondGen failed", t);
        } finally {
            if (changed) {
                this.topicQueueMappingManager.getDataVersion().nextVersion();
                this.topicQueueMappingManager.persist();
            }
            log.info("Try cleanItemListMoreThanSecondGen cost {} ms", System.currentTimeMillis() - start);
        }
    }