in java/e2e-v4/src/main/java/org/apache/rocketmq/utils/VerifyUtils.java [213:229]
public static void verifyNormalMessageWithReconsumeTimes(DataCollector<MessageExt> enqueueMessages, DataCollector<MessageExt> dequeueMessages, int reconsumeTime) {
int flexibleTime = TIMEOUT;
if (reconsumeTime == 1) {
flexibleTime = flexibleTime + 10;
} else if (reconsumeTime == 2) {
flexibleTime = flexibleTime + 10 + 30;
} else if (reconsumeTime == 3) {
flexibleTime = flexibleTime + 10 + 30 + 60;
} else if (reconsumeTime == 4) {
flexibleTime = flexibleTime + 10 + 30 + 60 + 120;
}
//检查是否消费完成
Collection<MessageExt> unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, flexibleTime * 1000L, 1);
if (unConsumedMessages.size() > 0) {
Assertions.fail(String.format("以下%s条消息未被消费: %s", unConsumedMessages.size(), unConsumedMessages));
}
}