in inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/qltystats/DefaultBrokerRcvQltyStats.java [236:366]
public void statisticDltBrokerStatus() {
// #lizard forgives
long currentTime = System.currentTimeMillis();
if ((currentTime - this.lastLinkStatisticTime < this.clientConfig.getSessionStatisticCheckDuration())
&& (currentTime - this.lastQualityStatisticTime < this.clientConfig.getMaxForbiddenCheckDuration())) {
return;
}
if (currentTime - this.lastLinkStatisticTime > this.clientConfig.getSessionStatisticCheckDuration()) {
this.lastLinkStatisticTime = System.currentTimeMillis();
this.cachedLinkQualities = getCurBrokerSentWaitStats();
}
if (System.currentTimeMillis() - this.lastQualityStatisticTime < this.clientConfig
.getMaxForbiddenCheckDuration()) {
return;
}
StringBuilder sBuilder = new StringBuilder(512);
this.lastQualityStatisticTime = System.currentTimeMillis();
if ((printCount++ % 10 == 0) && (!brokerStats.isEmpty())) {
if (!brokerForbiddenMap.isEmpty()) {
logger.info(sBuilder.append("[status check]: current response quality respForbiddenMap is ")
.append(brokerForbiddenMap.toString()).toString());
sBuilder.delete(0, sBuilder.length());
}
if (!rpcServiceFactory.getForbiddenAddrMap().isEmpty()) {
logger.info(sBuilder.append("[status check]: current request quality reqForbiddenMap is ")
.append(rpcServiceFactory.getForbiddenAddrMap().toString()).toString());
sBuilder.delete(0, sBuilder.length());
}
if (!rpcServiceFactory.getUnavailableBrokerMap().isEmpty()) {
logger.info(sBuilder.append("[status check]: current service unavailable brokerMap is ")
.append(rpcServiceFactory.getUnavailableBrokerMap().toString()).toString());
sBuilder.delete(0, sBuilder.length());
}
}
boolean changed = false;
long totalSuccRecNum = 0L;
HashMap<Integer, BrokerStatsDltTuple> needSelNumTMap =
new HashMap<>();
for (Map.Entry<Integer, BrokerStatsItemSet> brokerForbiddenEntry : brokerStats.entrySet()) {
BrokerStatsItemSet curStatsItemSet = brokerStats.get(brokerForbiddenEntry.getKey());
if (curStatsItemSet != null) {
long sendNum = curStatsItemSet.getDltAndSnapshotSendNum();
long succRecvNum = curStatsItemSet.getDltAndSnapshotRecSucNum();
if (!brokerForbiddenMap.containsKey(brokerForbiddenEntry.getKey())) {
totalSuccRecNum += succRecvNum;
needSelNumTMap.put(brokerForbiddenEntry.getKey(),
new BrokerStatsDltTuple(succRecvNum, sendNum));
}
}
}
for (Map.Entry<Integer, Long> brokerForbiddenEntry : brokerForbiddenMap.entrySet()) {
if (System.currentTimeMillis() - brokerForbiddenEntry.getValue() > this.clientConfig
.getMaxForbiddenCheckDuration()) {
changed = true;
brokerForbiddenMap.remove(brokerForbiddenEntry.getKey());
}
}
if (needSelNumTMap.isEmpty()) {
if (changed) {
if (!brokerForbiddenMap.isEmpty()) {
logger.info(sBuilder.append("End statistic 1: forbidden Broker Set is ")
.append(brokerForbiddenMap.toString()).toString());
sBuilder.delete(0, sBuilder.length());
}
}
return;
}
List<Map.Entry<Integer, BrokerStatsDltTuple>> lstData =
new ArrayList<>(needSelNumTMap.entrySet());
// Sort the list in ascending order
Collections.sort(lstData, new BrokerStatsDltTupleComparator(false));
int filteredBrokerListSize = lstData.size();
int needHoldCount =
(int) Math.rint((filteredBrokerListSize + brokerForbiddenMap.size())
* clientConfig.getMaxSentForbiddenRate());
needHoldCount -= brokerForbiddenMap.size();
if (needHoldCount <= 0) {
if (changed) {
if (!brokerForbiddenMap.isEmpty()) {
logger.info(sBuilder.append("End statistic 2: forbidden Broker Set is ")
.append(brokerForbiddenMap.toString()).toString());
sBuilder.delete(0, sBuilder.length());
}
}
return;
}
long avgSuccRecNumThreshold = 0L;
if (filteredBrokerListSize <= 3) {
totalSuccRecNum -= lstData.get(0).getValue().getSuccRecvNum();
avgSuccRecNumThreshold = (long) (totalSuccRecNum / (filteredBrokerListSize - 1) * 0.2);
} else {
totalSuccRecNum -= lstData.get(0).getValue().getSuccRecvNum();
totalSuccRecNum -= lstData.get(1).getValue().getSuccRecvNum();
totalSuccRecNum -= lstData.get(lstData.size() - 1).getValue().getSuccRecvNum();
avgSuccRecNumThreshold = (long) (totalSuccRecNum / (filteredBrokerListSize - 3) * 0.2);
}
ConcurrentHashMap<Integer, Boolean> tmpBrokerForbiddenMap =
new ConcurrentHashMap<>();
for (Map.Entry<Integer, BrokerStatsDltTuple> brokerDltNumEntry : lstData) {
long succRecvNum = brokerDltNumEntry.getValue().getSuccRecvNum();
long succSendNumThreshold = (long) (brokerDltNumEntry.getValue().getSendNum() * 0.1);
if ((succRecvNum < avgSuccRecNumThreshold) && (succSendNumThreshold > 2)
&& (succRecvNum < succSendNumThreshold)) {
tmpBrokerForbiddenMap.put(brokerDltNumEntry.getKey(), true);
if (logger.isDebugEnabled()) {
logger.debug(sBuilder.append("[forbidden statistic] brokerId=")
.append(brokerDltNumEntry.getKey()).append(",succRecvNum=")
.append(succRecvNum).append(",avgSuccRecNumThreshold=")
.append(avgSuccRecNumThreshold).append(",succSendNumThreshold=")
.append(succSendNumThreshold).toString());
sBuilder.delete(0, sBuilder.length());
}
}
if ((tmpBrokerForbiddenMap.size() >= needHoldCount)
|| (succRecvNum >= avgSuccRecNumThreshold)) {
break;
}
}
for (Integer tmpBrokerId : tmpBrokerForbiddenMap.keySet()) {
changed = true;
brokerForbiddenMap.put(tmpBrokerId, System.currentTimeMillis());
}
if (changed) {
if (!brokerForbiddenMap.isEmpty()) {
logger.info(sBuilder.append("End statistic 3: forbidden Broker Set is ")
.append(brokerForbiddenMap.toString()).toString());
sBuilder.delete(0, sBuilder.length());
}
}
}