public List getAllowedBrokerPartitions()

in inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/qltystats/DefaultBrokerRcvQltyStats.java [136:219]


    public List<Partition> getAllowedBrokerPartitions(
            Map<Integer, List<Partition>> brokerPartList) throws TubeClientException {
        // #lizard forgives
        List<Partition> partList = new ArrayList<>();
        if ((brokerPartList == null) || (brokerPartList.isEmpty())) {
            throw new TubeClientException("Null brokers to select sent, please try later!");
        }
        long currentWaitCount = this.curTotalSentRequestNum.get();
        if (currentWaitCount >= this.clientConfig.getSessionMaxAllowedDelayedMsgCount()) {
            throw new TubeClientException(new StringBuilder(512)
                    .append("Current delayed messages over max allowed count, allowed is ")
                    .append(this.clientConfig.getSessionMaxAllowedDelayedMsgCount())
                    .append(", current count is ").append(currentWaitCount).toString());
        }
        long curTime = System.currentTimeMillis();
        Set<Integer> allowedBrokerIds = new HashSet<>();
        ConcurrentHashMap<Integer, Long> unAvailableBrokerMap = rpcServiceFactory.getUnavailableBrokerMap();
        for (Map.Entry<Integer, List<Partition>> oldBrokerPartEntry : brokerPartList.entrySet()) {
            Long lastAddTime = unAvailableBrokerMap.get(oldBrokerPartEntry.getKey());
            if ((lastAddTime != null)
                    && (curTime - lastAddTime <= clientConfig.getUnAvailableFbdDurationMs())) {
                continue;
            }
            if (this.brokerForbiddenMap.containsKey(oldBrokerPartEntry.getKey())) {
                continue;
            }
            List<Partition> partitionList = oldBrokerPartEntry.getValue();
            if ((partitionList != null) && !partitionList.isEmpty()) {
                Partition partition = partitionList.get(0);
                if (partition != null) {
                    BrokerInfo brokerInfo = partition.getBroker();
                    AtomicLong curMaxSentNum =
                            this.brokerCurSentReqNum.get(brokerInfo.getBrokerId());
                    if ((curMaxSentNum != null)
                            && (curMaxSentNum.get() > this.clientConfig.getLinkMaxAllowedDelayedMsgCount())) {
                        continue;
                    }
                    if (!rpcServiceFactory.isRemoteAddrForbidden(brokerInfo.getBrokerAddr())) {
                        allowedBrokerIds.add(brokerInfo.getBrokerId());
                    }
                }
            }
        }
        if (allowedBrokerIds.isEmpty()) {
            throw new TubeClientException("The brokers of topic are all forbidden!");
        }
        int selectCount = allowedBrokerIds.size();
        int allowedCount = selectCount;
        if (currentWaitCount > this.clientConfig.getSessionWarnDelayedMsgCount()) {
            allowedCount =
                    (int) Math.rint(selectCount * (1 - this.clientConfig.getSessionWarnForbiddenRate()));
        }
        if ((this.cachedLinkQualities.isEmpty()) || (selectCount == allowedCount)) {
            for (Integer selBrokerId : allowedBrokerIds) {
                partList.addAll(brokerPartList.get(selBrokerId));
            }
        } else {
            List<Integer> cachedBrokerIds = new ArrayList<>();
            for (Map.Entry<Integer, BrokerStatsDltTuple> brokerEntry : this.cachedLinkQualities) {
                cachedBrokerIds.add(brokerEntry.getKey());
            }
            for (Integer selBrokerId : allowedBrokerIds) {
                if (!cachedBrokerIds.contains(selBrokerId)) {
                    partList.addAll(brokerPartList.get(selBrokerId));
                    allowedCount--;
                }
                if (allowedCount <= 0) {
                    break;
                }
            }
            if (allowedCount > 0) {
                for (Map.Entry<Integer, BrokerStatsDltTuple> brokerEntry : this.cachedLinkQualities) {
                    if (allowedBrokerIds.contains(brokerEntry.getKey())) {
                        partList.addAll(brokerPartList.get(brokerEntry.getKey()));
                        allowedCount--;
                    }
                    if (allowedCount <= 0) {
                        break;
                    }
                }
            }
        }
        return partList;
    }