in inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/qltystats/DefaultBrokerRcvQltyStats.java [136:219]
public List<Partition> getAllowedBrokerPartitions(
Map<Integer, List<Partition>> brokerPartList) throws TubeClientException {
// #lizard forgives
List<Partition> partList = new ArrayList<>();
if ((brokerPartList == null) || (brokerPartList.isEmpty())) {
throw new TubeClientException("Null brokers to select sent, please try later!");
}
long currentWaitCount = this.curTotalSentRequestNum.get();
if (currentWaitCount >= this.clientConfig.getSessionMaxAllowedDelayedMsgCount()) {
throw new TubeClientException(new StringBuilder(512)
.append("Current delayed messages over max allowed count, allowed is ")
.append(this.clientConfig.getSessionMaxAllowedDelayedMsgCount())
.append(", current count is ").append(currentWaitCount).toString());
}
long curTime = System.currentTimeMillis();
Set<Integer> allowedBrokerIds = new HashSet<>();
ConcurrentHashMap<Integer, Long> unAvailableBrokerMap = rpcServiceFactory.getUnavailableBrokerMap();
for (Map.Entry<Integer, List<Partition>> oldBrokerPartEntry : brokerPartList.entrySet()) {
Long lastAddTime = unAvailableBrokerMap.get(oldBrokerPartEntry.getKey());
if ((lastAddTime != null)
&& (curTime - lastAddTime <= clientConfig.getUnAvailableFbdDurationMs())) {
continue;
}
if (this.brokerForbiddenMap.containsKey(oldBrokerPartEntry.getKey())) {
continue;
}
List<Partition> partitionList = oldBrokerPartEntry.getValue();
if ((partitionList != null) && !partitionList.isEmpty()) {
Partition partition = partitionList.get(0);
if (partition != null) {
BrokerInfo brokerInfo = partition.getBroker();
AtomicLong curMaxSentNum =
this.brokerCurSentReqNum.get(brokerInfo.getBrokerId());
if ((curMaxSentNum != null)
&& (curMaxSentNum.get() > this.clientConfig.getLinkMaxAllowedDelayedMsgCount())) {
continue;
}
if (!rpcServiceFactory.isRemoteAddrForbidden(brokerInfo.getBrokerAddr())) {
allowedBrokerIds.add(brokerInfo.getBrokerId());
}
}
}
}
if (allowedBrokerIds.isEmpty()) {
throw new TubeClientException("The brokers of topic are all forbidden!");
}
int selectCount = allowedBrokerIds.size();
int allowedCount = selectCount;
if (currentWaitCount > this.clientConfig.getSessionWarnDelayedMsgCount()) {
allowedCount =
(int) Math.rint(selectCount * (1 - this.clientConfig.getSessionWarnForbiddenRate()));
}
if ((this.cachedLinkQualities.isEmpty()) || (selectCount == allowedCount)) {
for (Integer selBrokerId : allowedBrokerIds) {
partList.addAll(brokerPartList.get(selBrokerId));
}
} else {
List<Integer> cachedBrokerIds = new ArrayList<>();
for (Map.Entry<Integer, BrokerStatsDltTuple> brokerEntry : this.cachedLinkQualities) {
cachedBrokerIds.add(brokerEntry.getKey());
}
for (Integer selBrokerId : allowedBrokerIds) {
if (!cachedBrokerIds.contains(selBrokerId)) {
partList.addAll(brokerPartList.get(selBrokerId));
allowedCount--;
}
if (allowedCount <= 0) {
break;
}
}
if (allowedCount > 0) {
for (Map.Entry<Integer, BrokerStatsDltTuple> brokerEntry : this.cachedLinkQualities) {
if (allowedBrokerIds.contains(brokerEntry.getKey())) {
partList.addAll(brokerPartList.get(brokerEntry.getKey()));
allowedCount--;
}
if (allowedCount <= 0) {
break;
}
}
}
}
return partList;
}