public void checkAndReviseMetrics()

in store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java [1186:1300]


    public void checkAndReviseMetrics() {
        Map<String, TimerMetrics.Metric> smallOnes = new HashMap<>();
        Map<String, TimerMetrics.Metric> bigOnes = new HashMap<>();
        Map<Integer, String> smallHashs = new HashMap<>();
        Set<Integer> smallHashCollisions = new HashSet<>();
        for (Map.Entry<String, TimerMetrics.Metric> entry : timerMetrics.getTimingCount().entrySet()) {
            if (entry.getValue().getCount().get() < storeConfig.getTimerMetricSmallThreshold()) {
                smallOnes.put(entry.getKey(), entry.getValue());
                int hash = hashTopicForMetrics(entry.getKey());
                if (smallHashs.containsKey(hash)) {
                    LOGGER.warn("[CheckAndReviseMetrics]Metric hash collision between small-small code:{} small topic:{}{} small topic:{}{}", hash,
                        entry.getKey(), entry.getValue(),
                        smallHashs.get(hash), smallOnes.get(smallHashs.get(hash)));
                    smallHashCollisions.add(hash);
                }
                smallHashs.put(hash, entry.getKey());
            } else {
                bigOnes.put(entry.getKey(), entry.getValue());
            }
        }
        //check the hash collision between small ons and big ons
        for (Map.Entry<String, TimerMetrics.Metric> bjgEntry : bigOnes.entrySet()) {
            if (smallHashs.containsKey(hashTopicForMetrics(bjgEntry.getKey()))) {
                Iterator<Map.Entry<String, TimerMetrics.Metric>> smallIt = smallOnes.entrySet().iterator();
                while (smallIt.hasNext()) {
                    Map.Entry<String, TimerMetrics.Metric> smallEntry = smallIt.next();
                    if (hashTopicForMetrics(smallEntry.getKey()) == hashTopicForMetrics(bjgEntry.getKey())) {
                        LOGGER.warn("[CheckAndReviseMetrics]Metric hash collision between small-big code:{} small topic:{}{} big topic:{}{}", hashTopicForMetrics(smallEntry.getKey()),
                            smallEntry.getKey(), smallEntry.getValue(),
                            bjgEntry.getKey(), bjgEntry.getValue());
                        smallIt.remove();
                    }
                }
            }
        }
        //refresh
        smallHashs.clear();
        Map<String, TimerMetrics.Metric> newSmallOnes = new HashMap<>();
        for (String topic : smallOnes.keySet()) {
            newSmallOnes.put(topic, new TimerMetrics.Metric());
            smallHashs.put(hashTopicForMetrics(topic), topic);
        }

        //travel the timer log
        long readTimeMs = currReadTimeMs;
        long currOffsetPy = timerWheel.checkPhyPos(readTimeMs, 0);
        LinkedList<SelectMappedBufferResult> sbrs = new LinkedList<>();
        boolean hasError = false;
        try {
            while (true) {
                SelectMappedBufferResult timeSbr = timerLog.getWholeBuffer(currOffsetPy);
                if (timeSbr == null) {
                    break;
                } else {
                    sbrs.add(timeSbr);
                }
                ByteBuffer bf = timeSbr.getByteBuffer();
                for (int position = 0; position < timeSbr.getSize(); position += TimerLog.UNIT_SIZE) {
                    bf.position(position);
                    bf.getInt();//size
                    bf.getLong();//prev pos
                    int magic = bf.getInt(); //magic
                    long enqueueTime = bf.getLong();
                    long delayedTime = bf.getInt() + enqueueTime;
                    long offsetPy = bf.getLong();
                    int sizePy = bf.getInt();
                    int hashCode = bf.getInt();
                    if (delayedTime < readTimeMs) {
                        continue;
                    }
                    if (!smallHashs.containsKey(hashCode)) {
                        continue;
                    }
                    String topic = null;
                    if (smallHashCollisions.contains(hashCode)) {
                        MessageExt messageExt = getMessageByCommitOffset(offsetPy, sizePy);
                        if (null != messageExt) {
                            topic = messageExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
                        }
                    } else {
                        topic = smallHashs.get(hashCode);
                    }
                    if (null != topic && newSmallOnes.containsKey(topic)) {
                        newSmallOnes.get(topic).getCount().addAndGet(needDelete(magic) ? -1 : 1);
                    } else {
                        LOGGER.warn("[CheckAndReviseMetrics]Unexpected topic in checking timer metrics topic:{} code:{} offsetPy:{} size:{}", topic, hashCode, offsetPy, sizePy);
                    }
                }
                if (timeSbr.getSize() < timerLogFileSize) {
                    break;
                } else {
                    currOffsetPy = currOffsetPy + timerLogFileSize;
                }
            }

        } catch (Exception e) {
            hasError = true;
            LOGGER.error("[CheckAndReviseMetrics]Unknown error in checkAndReviseMetrics and abort", e);
        } finally {
            for (SelectMappedBufferResult sbr : sbrs) {
                if (null != sbr) {
                    sbr.release();
                }
            }
        }

        if (!hasError) {
            //update
            for (String topic : newSmallOnes.keySet()) {
                LOGGER.info("[CheckAndReviseMetrics]Revise metric for topic {} from {} to {}", topic, smallOnes.get(topic), newSmallOnes.get(topic));
            }
            timerMetrics.getTimingCount().putAll(newSmallOnes);
        }

    }