public void persist()

in core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java [174:214]


    public void persist(Set<MessageQueue> messageQueues) throws Throwable {
        if (messageQueues == null || messageQueues.size() == 0) {
            return;
        }

        Set<MessageQueue> stateTopicQueues = convertSourceTopicQueue2StateTopicQueue(messageQueues);
        for (MessageQueue stateTopicQueue : stateTopicQueues) {
            String stateTopicQueueKey = buildKey(stateTopicQueue);
            Set<byte[]> keySet = super.getInCalculating(stateTopicQueueKey);

            if (keySet == null || keySet.size() == 0) {
                continue;
            }

            String stateTopic = stateTopicQueue.getTopic();
            boolean isStaticTopic = stateTopicQueue.getBrokerName().equals(Constant.STATIC_TOPIC_BROKER_NAME);
            createStateTopic(stateTopic, isStaticTopic);

            for (byte[] key : keySet) {

                byte[] valueBytes = this.rocksDBStore.get(key);
                if (valueBytes == null) {
                    continue;
                }

                byte[] body = this.protocol.merge(key, valueBytes);

                Message message = new Message(stateTopicQueue.getTopic(), body);
                message.setKeys(Utils.toHexString(key));

                try {
                    logger.debug("persist key: " + new String(key, StandardCharsets.UTF_8) + ",messageQueue: " + stateTopicQueue);
                } catch (Throwable t) {
                    //key is not string, maybe.
                }

                this.producer.send(message, stateTopicQueue);
            }
            super.removeCalculating(stateTopicQueueKey);
        }
    }