in store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java [1186:1300]
public void checkAndReviseMetrics() {
Map<String, TimerMetrics.Metric> smallOnes = new HashMap<>();
Map<String, TimerMetrics.Metric> bigOnes = new HashMap<>();
Map<Integer, String> smallHashs = new HashMap<>();
Set<Integer> smallHashCollisions = new HashSet<>();
for (Map.Entry<String, TimerMetrics.Metric> entry : timerMetrics.getTimingCount().entrySet()) {
if (entry.getValue().getCount().get() < storeConfig.getTimerMetricSmallThreshold()) {
smallOnes.put(entry.getKey(), entry.getValue());
int hash = hashTopicForMetrics(entry.getKey());
if (smallHashs.containsKey(hash)) {
LOGGER.warn("[CheckAndReviseMetrics]Metric hash collision between small-small code:{} small topic:{}{} small topic:{}{}", hash,
entry.getKey(), entry.getValue(),
smallHashs.get(hash), smallOnes.get(smallHashs.get(hash)));
smallHashCollisions.add(hash);
}
smallHashs.put(hash, entry.getKey());
} else {
bigOnes.put(entry.getKey(), entry.getValue());
}
}
//check the hash collision between small ons and big ons
for (Map.Entry<String, TimerMetrics.Metric> bjgEntry : bigOnes.entrySet()) {
if (smallHashs.containsKey(hashTopicForMetrics(bjgEntry.getKey()))) {
Iterator<Map.Entry<String, TimerMetrics.Metric>> smallIt = smallOnes.entrySet().iterator();
while (smallIt.hasNext()) {
Map.Entry<String, TimerMetrics.Metric> smallEntry = smallIt.next();
if (hashTopicForMetrics(smallEntry.getKey()) == hashTopicForMetrics(bjgEntry.getKey())) {
LOGGER.warn("[CheckAndReviseMetrics]Metric hash collision between small-big code:{} small topic:{}{} big topic:{}{}", hashTopicForMetrics(smallEntry.getKey()),
smallEntry.getKey(), smallEntry.getValue(),
bjgEntry.getKey(), bjgEntry.getValue());
smallIt.remove();
}
}
}
}
//refresh
smallHashs.clear();
Map<String, TimerMetrics.Metric> newSmallOnes = new HashMap<>();
for (String topic : smallOnes.keySet()) {
newSmallOnes.put(topic, new TimerMetrics.Metric());
smallHashs.put(hashTopicForMetrics(topic), topic);
}
//travel the timer log
long readTimeMs = currReadTimeMs;
long currOffsetPy = timerWheel.checkPhyPos(readTimeMs, 0);
LinkedList<SelectMappedBufferResult> sbrs = new LinkedList<>();
boolean hasError = false;
try {
while (true) {
SelectMappedBufferResult timeSbr = timerLog.getWholeBuffer(currOffsetPy);
if (timeSbr == null) {
break;
} else {
sbrs.add(timeSbr);
}
ByteBuffer bf = timeSbr.getByteBuffer();
for (int position = 0; position < timeSbr.getSize(); position += TimerLog.UNIT_SIZE) {
bf.position(position);
bf.getInt();//size
bf.getLong();//prev pos
int magic = bf.getInt(); //magic
long enqueueTime = bf.getLong();
long delayedTime = bf.getInt() + enqueueTime;
long offsetPy = bf.getLong();
int sizePy = bf.getInt();
int hashCode = bf.getInt();
if (delayedTime < readTimeMs) {
continue;
}
if (!smallHashs.containsKey(hashCode)) {
continue;
}
String topic = null;
if (smallHashCollisions.contains(hashCode)) {
MessageExt messageExt = getMessageByCommitOffset(offsetPy, sizePy);
if (null != messageExt) {
topic = messageExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
}
} else {
topic = smallHashs.get(hashCode);
}
if (null != topic && newSmallOnes.containsKey(topic)) {
newSmallOnes.get(topic).getCount().addAndGet(needDelete(magic) ? -1 : 1);
} else {
LOGGER.warn("[CheckAndReviseMetrics]Unexpected topic in checking timer metrics topic:{} code:{} offsetPy:{} size:{}", topic, hashCode, offsetPy, sizePy);
}
}
if (timeSbr.getSize() < timerLogFileSize) {
break;
} else {
currOffsetPy = currOffsetPy + timerLogFileSize;
}
}
} catch (Exception e) {
hasError = true;
LOGGER.error("[CheckAndReviseMetrics]Unknown error in checkAndReviseMetrics and abort", e);
} finally {
for (SelectMappedBufferResult sbr : sbrs) {
if (null != sbr) {
sbr.release();
}
}
}
if (!hasError) {
//update
for (String topic : newSmallOnes.keySet()) {
LOGGER.info("[CheckAndReviseMetrics]Revise metric for topic {} from {} to {}", topic, smallOnes.get(topic), newSmallOnes.get(topic));
}
timerMetrics.getTimingCount().putAll(newSmallOnes);
}
}