public int dequeue()

in store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java [906:1015]


    public int dequeue() throws Exception {
        if (storeConfig.isTimerStopDequeue()) {
            return -1;
        }
        if (!isRunningDequeue()) {
            return -1;
        }
        if (currReadTimeMs >= currWriteTimeMs) {
            return -1;
        }

        Slot slot = timerWheel.getSlot(currReadTimeMs);
        if (-1 == slot.timeMs) {
            moveReadTime();
            return 0;
        }
        try {
            //clear the flag
            dequeueStatusChangeFlag = false;

            long currOffsetPy = slot.lastPos;
            Set<String> deleteUniqKeys = new ConcurrentSkipListSet<>();
            LinkedList<TimerRequest> normalMsgStack = new LinkedList<>();
            LinkedList<TimerRequest> deleteMsgStack = new LinkedList<>();
            LinkedList<SelectMappedBufferResult> sbrs = new LinkedList<>();
            SelectMappedBufferResult timeSbr = null;
            //read the timer log one by one
            while (currOffsetPy != -1) {
                perfCounterTicks.startTick("dequeue_read_timerlog");
                if (null == timeSbr || timeSbr.getStartOffset() > currOffsetPy) {
                    timeSbr = timerLog.getWholeBuffer(currOffsetPy);
                    if (null != timeSbr) {
                        sbrs.add(timeSbr);
                    }
                }
                if (null == timeSbr) {
                    break;
                }
                long prevPos = -1;
                try {
                    int position = (int) (currOffsetPy % timerLogFileSize);
                    timeSbr.getByteBuffer().position(position);
                    timeSbr.getByteBuffer().getInt(); //size
                    prevPos = timeSbr.getByteBuffer().getLong();
                    int magic = timeSbr.getByteBuffer().getInt();
                    long enqueueTime = timeSbr.getByteBuffer().getLong();
                    long delayedTime = timeSbr.getByteBuffer().getInt() + enqueueTime;
                    long offsetPy = timeSbr.getByteBuffer().getLong();
                    int sizePy = timeSbr.getByteBuffer().getInt();
                    TimerRequest timerRequest = new TimerRequest(offsetPy, sizePy, delayedTime, enqueueTime, magic);
                    timerRequest.setDeleteList(deleteUniqKeys);
                    if (needDelete(magic) && !needRoll(magic)) {
                        deleteMsgStack.add(timerRequest);
                    } else {
                        normalMsgStack.addFirst(timerRequest);
                    }
                } catch (Exception e) {
                    LOGGER.error("Error in dequeue_read_timerlog", e);
                } finally {
                    currOffsetPy = prevPos;
                    perfCounterTicks.endTick("dequeue_read_timerlog");
                }
            }
            if (deleteMsgStack.size() == 0 && normalMsgStack.size() == 0) {
                LOGGER.warn("dequeue time:{} but read nothing from timerLog", currReadTimeMs);
            }
            for (SelectMappedBufferResult sbr : sbrs) {
                if (null != sbr) {
                    sbr.release();
                }
            }
            if (!isRunningDequeue()) {
                return -1;
            }
            CountDownLatch deleteLatch = new CountDownLatch(deleteMsgStack.size());
            //read the delete msg: the msg used to mark another msg is deleted
            for (List<TimerRequest> deleteList : splitIntoLists(deleteMsgStack)) {
                for (TimerRequest tr : deleteList) {
                    tr.setLatch(deleteLatch);
                }
                dequeueGetQueue.put(deleteList);
            }
            //do we need to use loop with tryAcquire
            checkDequeueLatch(deleteLatch, currReadTimeMs);

            CountDownLatch normalLatch = new CountDownLatch(normalMsgStack.size());
            //read the normal msg
            for (List<TimerRequest> normalList : splitIntoLists(normalMsgStack)) {
                for (TimerRequest tr : normalList) {
                    tr.setLatch(normalLatch);
                }
                dequeueGetQueue.put(normalList);
            }
            checkDequeueLatch(normalLatch, currReadTimeMs);
            // if master -> slave -> master, then the read time move forward, and messages will be lossed
            if (dequeueStatusChangeFlag) {
                return -1;
            }
            if (!isRunningDequeue()) {
                return -1;
            }
            moveReadTime();
        } catch (Throwable t) {
            LOGGER.error("Unknown error in dequeue process", t);
            if (storeConfig.isTimerSkipUnknownError()) {
                moveReadTime();
            }
        }
        return 1;
    }