public static void verifyDelayMessageWithReconsumeTimes()

in java/e2e-v4/src/main/java/org/apache/rocketmq/utils/VerifyUtils.java [187:211]


    public static void verifyDelayMessageWithReconsumeTimes(DataCollector<MessageExt> enqueueMessages, DataCollector<MessageExt> dequeueMessages, int delayTime, int reconsumeTime) {
        int flexibleTime = TIMEOUT;
        if (reconsumeTime == 1) {
            flexibleTime = flexibleTime + 10;
        } else if (reconsumeTime == 2) {
            flexibleTime = flexibleTime + 10 + 30;
        } else if (reconsumeTime == 3) {
            flexibleTime = flexibleTime + 10 + 30 + 60;
        } else if (reconsumeTime == 4) {
            flexibleTime = flexibleTime + 10 + 30 + 60 + 120;
        }
        //检查是否消费完成
        Collection<MessageExt> unConsumedMessages = waitForMessageConsume(enqueueMessages, dequeueMessages, (flexibleTime + delayTime) * 1000L, 1);
        if (unConsumedMessages.size() > 0) {
            Assertions.fail(String.format("以下%s条消息未被消费: %s", unConsumedMessages.size(), unConsumedMessages));
        }
        //检查是否消费延迟性
        HashMap<String, Long> delayUnExcept = checkDelay(dequeueMessages, 5 + flexibleTime - 30);
        StringBuilder sb = new StringBuilder();
        sb.append("以下消息不符合延迟要求 \n");
        for (String msg : delayUnExcept.keySet()) {
            sb.append(msg).append(" , interval:").append(delayUnExcept.get(msg)).append("\n");
        }
        Assertions.assertEquals(0, delayUnExcept.size(), sb.toString());
    }