in java/e2e-v4/src/main/java/org/apache/rocketmq/utils/VerifyUtils.java [365:403]
private static Collection<MessageExt> waitForMessageConsume(DataCollector<MessageExt> enqueueMessages, DataCollector<MessageExt> dequeueMessages, Long timeoutMills, Integer consumedTimes) {
logger.info("Set timeout: {}ms", timeoutMills);
Collection<MessageExt> sendMessages = enqueueMessages.getAllData();
long currentTime = System.currentTimeMillis();
while (!sendMessages.isEmpty()) {
//logger.info("param1:{}, param2:{}", enqueueMessages.getDataSize(), dequeueMessages.getDataSize());
List<MessageExt> receivedMessagesCopy = new ArrayList<>(dequeueMessages.getAllData());
Iterator<MessageExt> iter = sendMessages.iterator();
while (iter.hasNext()) {
MessageExt message = iter.next();
long msgCount = receivedMessagesCopy
.stream()
.filter(msg -> {
if (msg.getUserProperty("UNIQ_KEY") != null && !msg.getUserProperty("UNIQ_KEY").equals(msg.getMsgId())) {
return msg.getUserProperty("UNIQ_KEY").equals(message.getMsgId()) || msg.getMsgId().equals(message.getMsgId());
}
return msg.getMsgId().equals(message.getMsgId());
})
.count();
if (msgCount > 0 && getRepeatedTimes(receivedMessagesCopy, message) == consumedTimes) {
iter.remove();
} else if (getRepeatedTimes(receivedMessagesCopy, message) > consumedTimes) {
Assertions.fail(String.format("消费到的重试消息多于预期(包含一条原始消息),Except:%s, Actual:%s, MsgId:%s", consumedTimes, getRepeatedTimes(receivedMessagesCopy, message), message.getMsgId()));
//logger.error("消费到的重试消息多于预期,Except:{}, Actual:{}", consumedTimes, getRepeatedTimes(receivedMessagesCopy, message));
}
}
if (sendMessages.isEmpty()) {
break;
}
if (System.currentTimeMillis() - currentTime >= timeoutMills) {
logger.error("Timeout but not received all send messages, not received msg: {}\n received msg:{}\n", sendMessages, receivedMessagesCopy);
break;
}
TestUtils.waitForMoment(500L);
}
return sendMessages;
}