in core/src/main/java/org/apache/rocketmq/streams/core/state/AbstractStore.java [80:96]
protected Set<MessageQueue> convertSourceTopicQueue2StateTopicQueue(Set<MessageQueue> messageQueues) {
if (messageQueues == null || messageQueues.size() == 0) {
return new HashSet<>();
}
HashSet<MessageQueue> result = new HashSet<>();
for (MessageQueue messageQueue : messageQueues) {
if (messageQueue.getTopic().endsWith(Constant.STATE_TOPIC_SUFFIX)) {
result.add(messageQueue);
continue;
}
MessageQueue queue = new MessageQueue(messageQueue.getTopic() + Constant.STATE_TOPIC_SUFFIX, messageQueue.getBrokerName(), messageQueue.getQueueId());
result.add(queue);
}
return result;
}