in broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java [307:439]
protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
HashMap<String, PopCheckPoint> map = consumeReviveObj.map;
HashMap<String, PopCheckPoint> mockPointMap = new HashMap<>();
long startScanTime = System.currentTimeMillis();
long endTime = 0;
long consumeOffset = this.brokerController.getConsumerOffsetManager().queryOffset(PopAckConstants.REVIVE_GROUP, reviveTopic, queueId);
long oldOffset = Math.max(reviveOffset, consumeOffset);
consumeReviveObj.oldOffset = oldOffset;
POP_LOGGER.info("reviveQueueId={}, old offset is {} ", queueId, oldOffset);
long offset = oldOffset + 1;
int noMsgCount = 0;
long firstRt = 0;
// offset self amend
while (true) {
if (!shouldRunPopRevive) {
POP_LOGGER.info("slave skip scan, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
break;
}
List<MessageExt> messageExts = getReviveMessage(offset, queueId);
if (messageExts == null || messageExts.isEmpty()) {
long old = endTime;
long timerDelay = brokerController.getMessageStore().getTimerMessageStore().getDequeueBehind();
long commitLogDelay = brokerController.getMessageStore().getTimerMessageStore().getEnqueueBehind();
// move endTime
if (endTime != 0 && System.currentTimeMillis() - endTime > 3 * PopAckConstants.SECOND && timerDelay <= 0 && commitLogDelay <= 0) {
endTime = System.currentTimeMillis();
}
POP_LOGGER.debug("reviveQueueId={}, offset is {}, can not get new msg, old endTime {}, new endTime {}, timerDelay={}, commitLogDelay={} ",
queueId, offset, old, endTime, timerDelay, commitLogDelay);
if (endTime - firstRt > PopAckConstants.ackTimeInterval + PopAckConstants.SECOND) {
break;
}
noMsgCount++;
// Fixme: why sleep is useful here?
try {
Thread.sleep(100);
} catch (Throwable ignore) {
}
if (noMsgCount * 100L > 4 * PopAckConstants.SECOND) {
break;
} else {
continue;
}
} else {
noMsgCount = 0;
}
if (System.currentTimeMillis() - startScanTime > brokerController.getBrokerConfig().getReviveScanTime()) {
POP_LOGGER.info("reviveQueueId={}, scan timeout ", queueId);
break;
}
for (MessageExt messageExt : messageExts) {
if (PopAckConstants.CK_TAG.equals(messageExt.getTags())) {
String raw = new String(messageExt.getBody(), DataConverter.CHARSET_UTF8);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={},find ck, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
}
PopCheckPoint point = JSON.parseObject(raw, PopCheckPoint.class);
if (point.getTopic() == null || point.getCId() == null) {
continue;
}
map.put(point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime() + point.getBrokerName(), point);
PopMetricsManager.incPopReviveCkGetCount(point, queueId);
point.setReviveOffset(messageExt.getQueueOffset());
if (firstRt == 0) {
firstRt = point.getReviveTime();
}
} else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) {
String raw = new String(messageExt.getBody(), StandardCharsets.UTF_8);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={}, find ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
}
AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);
PopMetricsManager.incPopReviveAckGetCount(ackMsg, queueId);
String brokerName = StringUtils.isNotBlank(ackMsg.getBrokerName()) ?
ackMsg.getBrokerName() : brokerController.getBrokerConfig().getBrokerName();
String mergeKey = ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + brokerName;
PopCheckPoint point = map.get(mergeKey);
if (point == null) {
if (!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {
continue;
}
if (mockCkForAck(messageExt, ackMsg, mergeKey, mockPointMap) && firstRt == 0) {
firstRt = mockPointMap.get(mergeKey).getReviveTime();
}
} else {
int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());
if (indexOfAck > -1) {
point.setBitMap(DataConverter.setBit(point.getBitMap(), indexOfAck, true));
} else {
POP_LOGGER.error("invalid ack index, {}, {}", ackMsg, point);
}
}
} else if (PopAckConstants.BATCH_ACK_TAG.equals(messageExt.getTags())) {
String raw = new String(messageExt.getBody(), StandardCharsets.UTF_8);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={}, find batch ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
}
BatchAckMsg bAckMsg = JSON.parseObject(raw, BatchAckMsg.class);
PopMetricsManager.incPopReviveAckGetCount(bAckMsg, queueId);
String brokerName = StringUtils.isNotBlank(bAckMsg.getBrokerName()) ?
bAckMsg.getBrokerName() : brokerController.getBrokerConfig().getBrokerName();
String mergeKey = bAckMsg.getTopic() + bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() + bAckMsg.getPopTime() + brokerName;
PopCheckPoint point = map.get(mergeKey);
if (point == null) {
if (!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {
continue;
}
if (mockCkForAck(messageExt, bAckMsg, mergeKey, mockPointMap) && firstRt == 0) {
firstRt = mockPointMap.get(mergeKey).getReviveTime();
}
} else {
List<Long> ackOffsetList = bAckMsg.getAckOffsetList();
for (Long ackOffset : ackOffsetList) {
int indexOfAck = point.indexOfAck(ackOffset);
if (indexOfAck > -1) {
point.setBitMap(DataConverter.setBit(point.getBitMap(), indexOfAck, true));
} else {
POP_LOGGER.error("invalid batch ack index, {}, {}", bAckMsg, point);
}
}
}
}
long deliverTime = messageExt.getDeliverTimeMs();
if (deliverTime > endTime) {
endTime = deliverTime;
}
}
offset = offset + messageExts.size();
}
consumeReviveObj.map.putAll(mockPointMap);
consumeReviveObj.endTime = endTime;
}