in store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java [1571:1659]
public void run() {
setState(AbstractStateService.START);
TimerMessageStore.LOGGER.info(this.getServiceName() + " service start");
//Mark different rounds
boolean isRound = true;
Map<String ,MessageExt> avoidDeleteLose = new HashMap<>();
while (!this.isStopped()) {
try {
setState(AbstractStateService.WAITING);
List<TimerRequest> trs = dequeueGetQueue.poll(100L * precisionMs / 1000, TimeUnit.MILLISECONDS);
if (null == trs || trs.size() == 0) {
continue;
}
setState(AbstractStateService.RUNNING);
for (int i = 0; i < trs.size(); ) {
TimerRequest tr = trs.get(i);
boolean doRes = false;
try {
long start = System.currentTimeMillis();
MessageExt msgExt = getMessageByCommitOffset(tr.getOffsetPy(), tr.getSizePy());
if (null != msgExt) {
if (needDelete(tr.getMagic()) && !needRoll(tr.getMagic())) {
//Clearing is performed once in each round.
//The deletion message is received first and the common message is received once
if (!isRound) {
isRound = true;
for (MessageExt messageExt: avoidDeleteLose.values()) {
addMetric(messageExt, 1);
}
avoidDeleteLose.clear();
}
if (msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY) != null && tr.getDeleteList() != null) {
avoidDeleteLose.put(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY), msgExt);
tr.getDeleteList().add(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY));
}
tr.idempotentRelease();
doRes = true;
} else {
String uniqueKey = MessageClientIDSetter.getUniqID(msgExt);
if (null == uniqueKey) {
LOGGER.warn("No uniqueKey for msg:{}", msgExt);
}
//Mark ready for next round
if (isRound) {
isRound = false;
}
if (null != uniqueKey && tr.getDeleteList() != null && tr.getDeleteList().size() > 0
&& tr.getDeleteList().contains(buildDeleteKey(getRealTopic(msgExt), uniqueKey))) {
avoidDeleteLose.remove(uniqueKey);
doRes = true;
tr.idempotentRelease();
perfCounterTicks.getCounter("dequeue_delete").flow(1);
} else {
tr.setMsg(msgExt);
while (!isStopped() && !doRes) {
doRes = dequeuePutQueue.offer(tr, 3, TimeUnit.SECONDS);
}
}
}
perfCounterTicks.getCounter("dequeue_get_msg").flow(System.currentTimeMillis() - start);
} else {
//the tr will never be processed afterwards, so idempotentRelease it
tr.idempotentRelease();
doRes = true;
perfCounterTicks.getCounter("dequeue_get_msg_miss").flow(System.currentTimeMillis() - start);
}
} catch (Throwable e) {
LOGGER.error("Unknown exception", e);
if (storeConfig.isTimerSkipUnknownError()) {
tr.idempotentRelease();
doRes = true;
} else {
holdMomentForUnknownError();
}
} finally {
if (doRes) {
i++;
}
}
}
trs.clear();
} catch (Throwable e) {
TimerMessageStore.LOGGER.error("Error occurred in " + getServiceName(), e);
}
}
TimerMessageStore.LOGGER.info(this.getServiceName() + " service end");
setState(AbstractStateService.END);
}