public void run()

in store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java [1571:1659]


        public void run() {
            setState(AbstractStateService.START);
            TimerMessageStore.LOGGER.info(this.getServiceName() + " service start");
            //Mark different rounds
            boolean isRound = true;
            Map<String ,MessageExt> avoidDeleteLose = new HashMap<>();
            while (!this.isStopped()) {
                try {
                    setState(AbstractStateService.WAITING);
                    List<TimerRequest> trs = dequeueGetQueue.poll(100L * precisionMs / 1000, TimeUnit.MILLISECONDS);
                    if (null == trs || trs.size() == 0) {
                        continue;
                    }
                    setState(AbstractStateService.RUNNING);
                    for (int i = 0; i < trs.size(); ) {
                        TimerRequest tr = trs.get(i);
                        boolean doRes = false;
                        try {
                            long start = System.currentTimeMillis();
                            MessageExt msgExt = getMessageByCommitOffset(tr.getOffsetPy(), tr.getSizePy());
                            if (null != msgExt) {
                                if (needDelete(tr.getMagic()) && !needRoll(tr.getMagic())) {
                                    //Clearing is performed once in each round.
                                    //The deletion message is received first and the common message is received once
                                    if (!isRound) {
                                        isRound = true;
                                        for (MessageExt messageExt: avoidDeleteLose.values()) {
                                            addMetric(messageExt, 1);
                                        }
                                        avoidDeleteLose.clear();
                                    }
                                    if (msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY) != null && tr.getDeleteList() != null) {

                                        avoidDeleteLose.put(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY), msgExt);
                                        tr.getDeleteList().add(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY));
                                    }
                                    tr.idempotentRelease();
                                    doRes = true;
                                } else {
                                    String uniqueKey = MessageClientIDSetter.getUniqID(msgExt);
                                    if (null == uniqueKey) {
                                        LOGGER.warn("No uniqueKey for msg:{}", msgExt);
                                    }
                                    //Mark ready for next round
                                    if (isRound) {
                                        isRound = false;
                                    }
                                    if (null != uniqueKey && tr.getDeleteList() != null && tr.getDeleteList().size() > 0
                                        && tr.getDeleteList().contains(buildDeleteKey(getRealTopic(msgExt), uniqueKey))) {
                                        avoidDeleteLose.remove(uniqueKey);
                                        doRes = true;
                                        tr.idempotentRelease();
                                        perfCounterTicks.getCounter("dequeue_delete").flow(1);
                                    } else {
                                        tr.setMsg(msgExt);
                                        while (!isStopped() && !doRes) {
                                            doRes = dequeuePutQueue.offer(tr, 3, TimeUnit.SECONDS);
                                        }
                                    }
                                }
                                perfCounterTicks.getCounter("dequeue_get_msg").flow(System.currentTimeMillis() - start);
                            } else {
                                //the tr will never be processed afterwards, so idempotentRelease it
                                tr.idempotentRelease();
                                doRes = true;
                                perfCounterTicks.getCounter("dequeue_get_msg_miss").flow(System.currentTimeMillis() - start);
                            }
                        } catch (Throwable e) {
                            LOGGER.error("Unknown exception", e);
                            if (storeConfig.isTimerSkipUnknownError()) {
                                tr.idempotentRelease();
                                doRes = true;
                            } else {
                                holdMomentForUnknownError();
                            }
                        } finally {
                            if (doRes) {
                                i++;
                            }
                        }
                    }
                    trs.clear();
                } catch (Throwable e) {
                    TimerMessageStore.LOGGER.error("Error occurred in " + getServiceName(), e);
                }
            }
            TimerMessageStore.LOGGER.info(this.getServiceName() + " service end");
            setState(AbstractStateService.END);
        }