in core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java [174:214]
public void persist(Set<MessageQueue> messageQueues) throws Throwable {
if (messageQueues == null || messageQueues.size() == 0) {
return;
}
Set<MessageQueue> stateTopicQueues = convertSourceTopicQueue2StateTopicQueue(messageQueues);
for (MessageQueue stateTopicQueue : stateTopicQueues) {
String stateTopicQueueKey = buildKey(stateTopicQueue);
Set<byte[]> keySet = super.getInCalculating(stateTopicQueueKey);
if (keySet == null || keySet.size() == 0) {
continue;
}
String stateTopic = stateTopicQueue.getTopic();
boolean isStaticTopic = stateTopicQueue.getBrokerName().equals(Constant.STATIC_TOPIC_BROKER_NAME);
createStateTopic(stateTopic, isStaticTopic);
for (byte[] key : keySet) {
byte[] valueBytes = this.rocksDBStore.get(key);
if (valueBytes == null) {
continue;
}
byte[] body = this.protocol.merge(key, valueBytes);
Message message = new Message(stateTopicQueue.getTopic(), body);
message.setKeys(Utils.toHexString(key));
try {
logger.debug("persist key: " + new String(key, StandardCharsets.UTF_8) + ",messageQueue: " + stateTopicQueue);
} catch (Throwable t) {
//key is not string, maybe.
}
this.producer.send(message, stateTopicQueue);
}
super.removeCalculating(stateTopicQueueKey);
}
}