in core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java [326:373]
private void replayState(List<MessageExt> msgs) throws Throwable {
if (msgs == null || msgs.size() == 0) {
return;
}
Map<String/*brokerName@topic@queueId of state topic*/, List<MessageExt>> groupByQueueId = msgs.stream().parallel().collect(Collectors.groupingBy(this::buildKey));
for (String uniqueQueue : groupByQueueId.keySet()) {
List<MessageExt> messageExts = groupByQueueId.get(uniqueQueue);
Map<String/*K的hashcode,真正的key在body里面*/, List<MessageExt>> groupByKeyHashcode = messageExts.stream().parallel().collect(Collectors.groupingBy(MessageExt::getKeys));
for (String keyHashcode : groupByKeyHashcode.keySet()) {
//相同brokerName@topic@queueId + keyHashcode 在一次拉取中的所有数据
List<MessageExt> exts = groupByKeyHashcode.get(keyHashcode);
//取最大queueOffset的消息,按照queueOffset,相同key,大的queueOffset覆盖小的queueOffset
MessageExt result = exts.stream()
.max(Comparator.comparingLong(MessageExt::getQueueOffset))
.orElse(null);
if (result == null) {
continue;
}
String emptyBody = result.getUserProperty(Constant.EMPTY_BODY);
if (Constant.TRUE.equals(emptyBody)) {
continue;
}
byte[] body = result.getBody();
Pair<byte[], byte[]> pair = this.protocol.split(body);
byte[] key = pair.getKey();
byte[] value = pair.getValue();
//放入rocksdb
MessageQueue stateTopicQueue = new MessageQueue(result.getTopic(), result.getBrokerName(), result.getQueueId());
try {
logger.debug("recover state, key: " + new String(key, StandardCharsets.UTF_8) + ", stateTopicQueue: " + stateTopicQueue);
} catch (Throwable t) {
}
String stateTopicQueueKey = buildKey(stateTopicQueue);
super.putInRecover(stateTopicQueueKey, key);
this.rocksDBStore.put(key, value);
}
}
}