private static Collection waitForMessageConsume()

in java/e2e-v4/src/main/java/org/apache/rocketmq/utils/VerifyUtils.java [365:403]


    private static Collection<MessageExt> waitForMessageConsume(DataCollector<MessageExt> enqueueMessages, DataCollector<MessageExt> dequeueMessages, Long timeoutMills, Integer consumedTimes) {
        logger.info("Set timeout: {}ms", timeoutMills);
        Collection<MessageExt> sendMessages = enqueueMessages.getAllData();

        long currentTime = System.currentTimeMillis();

        while (!sendMessages.isEmpty()) {
            //logger.info("param1:{}, param2:{}", enqueueMessages.getDataSize(), dequeueMessages.getDataSize());
            List<MessageExt> receivedMessagesCopy = new ArrayList<>(dequeueMessages.getAllData());
            Iterator<MessageExt> iter = sendMessages.iterator();
            while (iter.hasNext()) {
                MessageExt message = iter.next();
                long msgCount = receivedMessagesCopy
                        .stream()
                        .filter(msg -> {
                            if (msg.getUserProperty("UNIQ_KEY") != null && !msg.getUserProperty("UNIQ_KEY").equals(msg.getMsgId())) {
                                return msg.getUserProperty("UNIQ_KEY").equals(message.getMsgId()) || msg.getMsgId().equals(message.getMsgId());
                            }
                            return msg.getMsgId().equals(message.getMsgId());
                        })
                        .count();
                if (msgCount > 0 && getRepeatedTimes(receivedMessagesCopy, message) == consumedTimes) {
                    iter.remove();
                } else if (getRepeatedTimes(receivedMessagesCopy, message) > consumedTimes) {
                    Assertions.fail(String.format("消费到的重试消息多于预期(包含一条原始消息),Except:%s, Actual:%s, MsgId:%s", consumedTimes, getRepeatedTimes(receivedMessagesCopy, message), message.getMsgId()));
                    //logger.error("消费到的重试消息多于预期,Except:{}, Actual:{}", consumedTimes, getRepeatedTimes(receivedMessagesCopy, message));
                }
            }
            if (sendMessages.isEmpty()) {
                break;
            }
            if (System.currentTimeMillis() - currentTime >= timeoutMills) {
                logger.error("Timeout but not received all send messages, not received msg: {}\n received msg:{}\n", sendMessages, receivedMessagesCopy);
                break;
            }
            TestUtils.waitForMoment(500L);
        }
        return sendMessages;
    }