public void statisticDltBrokerStatus()

in inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/qltystats/DefaultBrokerRcvQltyStats.java [236:366]


    public void statisticDltBrokerStatus() {
        // #lizard forgives
        long currentTime = System.currentTimeMillis();
        if ((currentTime - this.lastLinkStatisticTime < this.clientConfig.getSessionStatisticCheckDuration())
                && (currentTime - this.lastQualityStatisticTime < this.clientConfig.getMaxForbiddenCheckDuration())) {
            return;
        }
        if (currentTime - this.lastLinkStatisticTime > this.clientConfig.getSessionStatisticCheckDuration()) {
            this.lastLinkStatisticTime = System.currentTimeMillis();
            this.cachedLinkQualities = getCurBrokerSentWaitStats();
        }
        if (System.currentTimeMillis() - this.lastQualityStatisticTime < this.clientConfig
                .getMaxForbiddenCheckDuration()) {
            return;
        }
        StringBuilder sBuilder = new StringBuilder(512);
        this.lastQualityStatisticTime = System.currentTimeMillis();
        if ((printCount++ % 10 == 0) && (!brokerStats.isEmpty())) {
            if (!brokerForbiddenMap.isEmpty()) {
                logger.info(sBuilder.append("[status check]: current response quality respForbiddenMap is ")
                        .append(brokerForbiddenMap.toString()).toString());
                sBuilder.delete(0, sBuilder.length());
            }
            if (!rpcServiceFactory.getForbiddenAddrMap().isEmpty()) {
                logger.info(sBuilder.append("[status check]: current request quality reqForbiddenMap is ")
                        .append(rpcServiceFactory.getForbiddenAddrMap().toString()).toString());
                sBuilder.delete(0, sBuilder.length());
            }
            if (!rpcServiceFactory.getUnavailableBrokerMap().isEmpty()) {
                logger.info(sBuilder.append("[status check]: current service unavailable brokerMap is ")
                        .append(rpcServiceFactory.getUnavailableBrokerMap().toString()).toString());
                sBuilder.delete(0, sBuilder.length());
            }
        }

        boolean changed = false;
        long totalSuccRecNum = 0L;
        HashMap<Integer, BrokerStatsDltTuple> needSelNumTMap =
                new HashMap<>();
        for (Map.Entry<Integer, BrokerStatsItemSet> brokerForbiddenEntry : brokerStats.entrySet()) {
            BrokerStatsItemSet curStatsItemSet = brokerStats.get(brokerForbiddenEntry.getKey());
            if (curStatsItemSet != null) {
                long sendNum = curStatsItemSet.getDltAndSnapshotSendNum();
                long succRecvNum = curStatsItemSet.getDltAndSnapshotRecSucNum();
                if (!brokerForbiddenMap.containsKey(brokerForbiddenEntry.getKey())) {
                    totalSuccRecNum += succRecvNum;
                    needSelNumTMap.put(brokerForbiddenEntry.getKey(),
                            new BrokerStatsDltTuple(succRecvNum, sendNum));
                }
            }
        }
        for (Map.Entry<Integer, Long> brokerForbiddenEntry : brokerForbiddenMap.entrySet()) {
            if (System.currentTimeMillis() - brokerForbiddenEntry.getValue() > this.clientConfig
                    .getMaxForbiddenCheckDuration()) {
                changed = true;
                brokerForbiddenMap.remove(brokerForbiddenEntry.getKey());
            }
        }
        if (needSelNumTMap.isEmpty()) {
            if (changed) {
                if (!brokerForbiddenMap.isEmpty()) {
                    logger.info(sBuilder.append("End statistic 1: forbidden Broker Set is ")
                            .append(brokerForbiddenMap.toString()).toString());
                    sBuilder.delete(0, sBuilder.length());
                }
            }
            return;
        }
        List<Map.Entry<Integer, BrokerStatsDltTuple>> lstData =
                new ArrayList<>(needSelNumTMap.entrySet());
        // Sort the list in ascending order
        Collections.sort(lstData, new BrokerStatsDltTupleComparator(false));
        int filteredBrokerListSize = lstData.size();
        int needHoldCount =
                (int) Math.rint((filteredBrokerListSize + brokerForbiddenMap.size())
                        * clientConfig.getMaxSentForbiddenRate());
        needHoldCount -= brokerForbiddenMap.size();
        if (needHoldCount <= 0) {
            if (changed) {
                if (!brokerForbiddenMap.isEmpty()) {
                    logger.info(sBuilder.append("End statistic 2: forbidden Broker Set is ")
                            .append(brokerForbiddenMap.toString()).toString());
                    sBuilder.delete(0, sBuilder.length());
                }
            }
            return;
        }
        long avgSuccRecNumThreshold = 0L;
        if (filteredBrokerListSize <= 3) {
            totalSuccRecNum -= lstData.get(0).getValue().getSuccRecvNum();
            avgSuccRecNumThreshold = (long) (totalSuccRecNum / (filteredBrokerListSize - 1) * 0.2);
        } else {
            totalSuccRecNum -= lstData.get(0).getValue().getSuccRecvNum();
            totalSuccRecNum -= lstData.get(1).getValue().getSuccRecvNum();
            totalSuccRecNum -= lstData.get(lstData.size() - 1).getValue().getSuccRecvNum();
            avgSuccRecNumThreshold = (long) (totalSuccRecNum / (filteredBrokerListSize - 3) * 0.2);
        }
        ConcurrentHashMap<Integer, Boolean> tmpBrokerForbiddenMap =
                new ConcurrentHashMap<>();
        for (Map.Entry<Integer, BrokerStatsDltTuple> brokerDltNumEntry : lstData) {
            long succRecvNum = brokerDltNumEntry.getValue().getSuccRecvNum();
            long succSendNumThreshold = (long) (brokerDltNumEntry.getValue().getSendNum() * 0.1);
            if ((succRecvNum < avgSuccRecNumThreshold) && (succSendNumThreshold > 2)
                    && (succRecvNum < succSendNumThreshold)) {
                tmpBrokerForbiddenMap.put(brokerDltNumEntry.getKey(), true);
                if (logger.isDebugEnabled()) {
                    logger.debug(sBuilder.append("[forbidden statistic] brokerId=")
                            .append(brokerDltNumEntry.getKey()).append(",succRecvNum=")
                            .append(succRecvNum).append(",avgSuccRecNumThreshold=")
                            .append(avgSuccRecNumThreshold).append(",succSendNumThreshold=")
                            .append(succSendNumThreshold).toString());
                    sBuilder.delete(0, sBuilder.length());
                }
            }
            if ((tmpBrokerForbiddenMap.size() >= needHoldCount)
                    || (succRecvNum >= avgSuccRecNumThreshold)) {
                break;
            }
        }
        for (Integer tmpBrokerId : tmpBrokerForbiddenMap.keySet()) {
            changed = true;
            brokerForbiddenMap.put(tmpBrokerId, System.currentTimeMillis());
        }
        if (changed) {
            if (!brokerForbiddenMap.isEmpty()) {
                logger.info(sBuilder.append("End statistic 3: forbidden Broker Set is ")
                        .append(brokerForbiddenMap.toString()).toString());
                sBuilder.delete(0, sBuilder.length());
            }
        }
    }