private void replayState()

in core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java [326:373]


    private void replayState(List<MessageExt> msgs) throws Throwable {
        if (msgs == null || msgs.size() == 0) {
            return;
        }

        Map<String/*brokerName@topic@queueId of state topic*/, List<MessageExt>> groupByQueueId = msgs.stream().parallel().collect(Collectors.groupingBy(this::buildKey));

        for (String uniqueQueue : groupByQueueId.keySet()) {
            List<MessageExt> messageExts = groupByQueueId.get(uniqueQueue);
            Map<String/*K的hashcode,真正的key在body里面*/, List<MessageExt>> groupByKeyHashcode = messageExts.stream().parallel().collect(Collectors.groupingBy(MessageExt::getKeys));

            for (String keyHashcode : groupByKeyHashcode.keySet()) {
                //相同brokerName@topic@queueId + keyHashcode 在一次拉取中的所有数据
                List<MessageExt> exts = groupByKeyHashcode.get(keyHashcode);

                //取最大queueOffset的消息,按照queueOffset,相同key,大的queueOffset覆盖小的queueOffset
                MessageExt result = exts.stream()
                        .max(Comparator.comparingLong(MessageExt::getQueueOffset))
                        .orElse(null);

                if (result == null) {
                    continue;
                }

                String emptyBody = result.getUserProperty(Constant.EMPTY_BODY);
                if (Constant.TRUE.equals(emptyBody)) {
                    continue;
                }

                byte[] body = result.getBody();
                Pair<byte[], byte[]> pair = this.protocol.split(body);

                byte[] key = pair.getKey();
                byte[] value = pair.getValue();

                //放入rocksdb
                MessageQueue stateTopicQueue = new MessageQueue(result.getTopic(), result.getBrokerName(), result.getQueueId());
                try {
                    logger.debug("recover state, key: " + new String(key, StandardCharsets.UTF_8) + ", stateTopicQueue: " + stateTopicQueue);
                } catch (Throwable t) {
                }

                String stateTopicQueueKey = buildKey(stateTopicQueue);
                super.putInRecover(stateTopicQueueKey, key);
                this.rocksDBStore.put(key, value);
            }
        }
    }