in broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java [162:354]
public void check(long transactionTimeout, int transactionCheckMax,
AbstractTransactionalMessageCheckListener listener) {
try {
String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
if (msgQueues == null || msgQueues.size() == 0) {
log.warn("The queue of topic is empty :" + topic);
return;
}
log.debug("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
MessageQueue opQueue = getOpQueue(messageQueue);
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
if (halfOffset < 0 || opOffset < 0) {
log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
halfOffset, opOffset);
continue;
}
List<Long> doneOpOffset = new ArrayList<>();
HashMap<Long, Long> removeMap = new HashMap<>();
HashMap<Long, HashSet<Long>> opMsgMap = new HashMap<Long, HashSet<Long>>();
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, opMsgMap, doneOpOffset);
if (null == pullResult) {
log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
messageQueue, halfOffset, opOffset);
continue;
}
// single thread
int getMessageNullCount = 1;
long newOffset = halfOffset;
long i = halfOffset;
long nextOpOffset = pullResult.getNextBeginOffset();
int putInQueueCount = 0;
int escapeFailCnt = 0;
while (true) {
if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
break;
}
Long removedOpOffset;
if ((removedOpOffset = removeMap.remove(i)) != null) {
log.debug("Half offset {} has been committed/rolled back", i);
opMsgMap.get(removedOpOffset).remove(i);
if (opMsgMap.get(removedOpOffset).size() == 0) {
opMsgMap.remove(removedOpOffset);
doneOpOffset.add(removedOpOffset);
}
} else {
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
if (msgExt == null) {
if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
break;
}
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
messageQueue, getMessageNullCount, getResult.getPullResult());
break;
} else {
log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
i, messageQueue, getMessageNullCount, getResult.getPullResult());
i = getResult.getPullResult().getNextBeginOffset();
newOffset = i;
continue;
}
}
if (this.transactionalMessageBridge.getBrokerController().getBrokerConfig().isEnableSlaveActingMaster()
&& this.transactionalMessageBridge.getBrokerController().getMinBrokerIdInGroup()
== this.transactionalMessageBridge.getBrokerController().getBrokerIdentity().getBrokerId()
&& BrokerRole.SLAVE.equals(this.transactionalMessageBridge.getBrokerController().getMessageStoreConfig().getBrokerRole())
) {
final MessageExtBrokerInner msgInner = this.transactionalMessageBridge.renewHalfMessageInner(msgExt);
final boolean isSuccess = this.transactionalMessageBridge.escapeMessage(msgInner);
if (isSuccess) {
escapeFailCnt = 0;
newOffset = i + 1;
i++;
} else {
log.warn("Escaping transactional message failed {} times! msgId(offsetId)={}, UNIQ_KEY(transactionId)={}",
escapeFailCnt + 1,
msgExt.getMsgId(),
msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
if (escapeFailCnt < MAX_RETRY_TIMES_FOR_ESCAPE) {
escapeFailCnt++;
Thread.sleep(100L * (2 ^ escapeFailCnt));
} else {
escapeFailCnt = 0;
newOffset = i + 1;
i++;
}
}
continue;
}
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
if (msgExt.getStoreTimestamp() >= startTime) {
log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
new Date(msgExt.getStoreTimestamp()));
break;
}
long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
long checkImmunityTime = transactionTimeout;
String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
if (null != checkImmunityTimeStr) {
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt, checkImmunityTimeStr)) {
newOffset = i + 1;
i++;
continue;
}
}
} else {
if (0 <= valueOfCurrentMinusBorn && valueOfCurrentMinusBorn < checkImmunityTime) {
log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
checkImmunityTime, new Date(msgExt.getBornTimestamp()));
break;
}
}
List<MessageExt> opMsg = pullResult == null ? null : pullResult.getMsgFoundList();
boolean isNeedCheck = opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime
|| opMsg != null && opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout
|| valueOfCurrentMinusBorn <= -1;
if (isNeedCheck) {
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
putInQueueCount++;
log.info("Check transaction. real_topic={},uniqKey={},offset={},commitLogOffset={}",
msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC),
msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX),
msgExt.getQueueOffset(), msgExt.getCommitLogOffset());
listener.resolveHalfMsg(msgExt);
} else {
nextOpOffset = pullResult != null ? pullResult.getNextBeginOffset() : nextOpOffset;
pullResult = fillOpRemoveMap(removeMap, opQueue, nextOpOffset,
halfOffset, opMsgMap, doneOpOffset);
if (pullResult == null || pullResult.getPullStatus() == PullStatus.NO_NEW_MSG
|| pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL
|| pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) {
try {
Thread.sleep(SLEEP_WHILE_NO_OP);
} catch (Throwable ignored) {
}
} else {
log.info("The miss message offset:{}, pullOffsetOfOp:{}, miniOffset:{} get more opMsg.", i, nextOpOffset, halfOffset);
}
continue;
}
}
newOffset = i + 1;
i++;
}
if (newOffset != halfOffset) {
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
if (newOpOffset != opOffset) {
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
GetResult getResult = getHalfMsg(messageQueue, newOffset);
pullResult = pullOpMsg(opQueue, newOpOffset, 1);
long maxMsgOffset = getResult.getPullResult() == null ? newOffset : getResult.getPullResult().getMaxOffset();
long maxOpOffset = pullResult == null ? newOpOffset : pullResult.getMaxOffset();
long msgTime = getResult.getMsg() == null ? System.currentTimeMillis() : getResult.getMsg().getStoreTimestamp();
log.info("After check, {} opOffset={} opOffsetDiff={} msgOffset={} msgOffsetDiff={} msgTime={} msgTimeDelayInMs={} putInQueueCount={}",
messageQueue, newOpOffset, maxOpOffset - newOpOffset, newOffset, maxMsgOffset - newOffset, new Date(msgTime),
System.currentTimeMillis() - msgTime, putInQueueCount);
}
} catch (Throwable e) {
log.error("Check error", e);
}
}