in store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java [906:1015]
public int dequeue() throws Exception {
if (storeConfig.isTimerStopDequeue()) {
return -1;
}
if (!isRunningDequeue()) {
return -1;
}
if (currReadTimeMs >= currWriteTimeMs) {
return -1;
}
Slot slot = timerWheel.getSlot(currReadTimeMs);
if (-1 == slot.timeMs) {
moveReadTime();
return 0;
}
try {
//clear the flag
dequeueStatusChangeFlag = false;
long currOffsetPy = slot.lastPos;
Set<String> deleteUniqKeys = new ConcurrentSkipListSet<>();
LinkedList<TimerRequest> normalMsgStack = new LinkedList<>();
LinkedList<TimerRequest> deleteMsgStack = new LinkedList<>();
LinkedList<SelectMappedBufferResult> sbrs = new LinkedList<>();
SelectMappedBufferResult timeSbr = null;
//read the timer log one by one
while (currOffsetPy != -1) {
perfCounterTicks.startTick("dequeue_read_timerlog");
if (null == timeSbr || timeSbr.getStartOffset() > currOffsetPy) {
timeSbr = timerLog.getWholeBuffer(currOffsetPy);
if (null != timeSbr) {
sbrs.add(timeSbr);
}
}
if (null == timeSbr) {
break;
}
long prevPos = -1;
try {
int position = (int) (currOffsetPy % timerLogFileSize);
timeSbr.getByteBuffer().position(position);
timeSbr.getByteBuffer().getInt(); //size
prevPos = timeSbr.getByteBuffer().getLong();
int magic = timeSbr.getByteBuffer().getInt();
long enqueueTime = timeSbr.getByteBuffer().getLong();
long delayedTime = timeSbr.getByteBuffer().getInt() + enqueueTime;
long offsetPy = timeSbr.getByteBuffer().getLong();
int sizePy = timeSbr.getByteBuffer().getInt();
TimerRequest timerRequest = new TimerRequest(offsetPy, sizePy, delayedTime, enqueueTime, magic);
timerRequest.setDeleteList(deleteUniqKeys);
if (needDelete(magic) && !needRoll(magic)) {
deleteMsgStack.add(timerRequest);
} else {
normalMsgStack.addFirst(timerRequest);
}
} catch (Exception e) {
LOGGER.error("Error in dequeue_read_timerlog", e);
} finally {
currOffsetPy = prevPos;
perfCounterTicks.endTick("dequeue_read_timerlog");
}
}
if (deleteMsgStack.size() == 0 && normalMsgStack.size() == 0) {
LOGGER.warn("dequeue time:{} but read nothing from timerLog", currReadTimeMs);
}
for (SelectMappedBufferResult sbr : sbrs) {
if (null != sbr) {
sbr.release();
}
}
if (!isRunningDequeue()) {
return -1;
}
CountDownLatch deleteLatch = new CountDownLatch(deleteMsgStack.size());
//read the delete msg: the msg used to mark another msg is deleted
for (List<TimerRequest> deleteList : splitIntoLists(deleteMsgStack)) {
for (TimerRequest tr : deleteList) {
tr.setLatch(deleteLatch);
}
dequeueGetQueue.put(deleteList);
}
//do we need to use loop with tryAcquire
checkDequeueLatch(deleteLatch, currReadTimeMs);
CountDownLatch normalLatch = new CountDownLatch(normalMsgStack.size());
//read the normal msg
for (List<TimerRequest> normalList : splitIntoLists(normalMsgStack)) {
for (TimerRequest tr : normalList) {
tr.setLatch(normalLatch);
}
dequeueGetQueue.put(normalList);
}
checkDequeueLatch(normalLatch, currReadTimeMs);
// if master -> slave -> master, then the read time move forward, and messages will be lossed
if (dequeueStatusChangeFlag) {
return -1;
}
if (!isRunningDequeue()) {
return -1;
}
moveReadTime();
} catch (Throwable t) {
LOGGER.error("Unknown error in dequeue process", t);
if (storeConfig.isTimerSkipUnknownError()) {
moveReadTime();
}
}
return 1;
}