in java/e2e-v4/src/main/java/org/apache/rocketmq/utils/VerifyUtils.java [187:211]
public static void verifyDelayMessageWithReconsumeTimes(DataCollector<MessageExt> enqueueMessages, DataCollector<MessageExt> dequeueMessages, int delayTime, int reconsumeTime) {
int flexibleTime = TIMEOUT;
if (reconsumeTime == 1) {
flexibleTime = flexibleTime + 10;
} else if (reconsumeTime == 2) {
flexibleTime = flexibleTime + 10 + 30;
} else if (reconsumeTime == 3) {
flexibleTime = flexibleTime + 10 + 30 + 60;
} else if (reconsumeTime == 4) {
flexibleTime = flexibleTime + 10 + 30 + 60 + 120;
}
//检查是否消费完成
Collection<MessageExt> unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, (flexibleTime + delayTime) * 1000L, 1);
if (unConsumedMessages.size() > 0) {
Assertions.fail(String.format("以下%s条消息未被消费: %s", unConsumedMessages.size(), unConsumedMessages));
}
//检查是否消费延迟性
HashMap<String, Long> delayUnExcept = checkDelay(dequeueMessages, 5 + flexibleTime - 30);
StringBuilder sb = new StringBuilder();
sb.append("以下消息不符合延迟要求 \n");
for (String msg : delayUnExcept.keySet()) {
sb.append(msg).append(" , interval:").append(delayUnExcept.get(msg)).append("\n");
}
Assertions.assertEquals(0, delayUnExcept.size(), sb.toString());
}