in mqtt-common/src/main/java/org/apache/rocketmq/mqtt/common/util/StatUtil.java [277:306]
public static void addPv(String key, long totalPv, boolean success) {
if (invokeCache.size() > MAX_KEY_NUM || secondInvokeCache.size() > MAX_KEY_NUM) {
return;
}
if (totalPv <= 0) {
return;
}
Invoke invoke = getAndSetInvoke(key);
if (invoke == null) {
return;
}
invoke.totalPv.addAndGet(totalPv);
if (!success) {
invoke.failPv.addAndGet(totalPv);
}
long now = nowSecond();
AtomicLong oldSecond = invoke.second;
if (oldSecond.get() == now) {
invoke.secondPv.addAndGet((int)totalPv);
} else {
if (oldSecond.compareAndSet(oldSecond.get(), now)) {
if (invoke.secondPv.get() > invoke.topSecondPv.get()) {
invoke.topSecondPv.set(invoke.secondPv.get());
}
invoke.secondPv.set((int)totalPv);
} else {
invoke.secondPv.addAndGet((int)totalPv);
}
}
}