in core/src/main/java/org/apache/rocketmq/streams/core/state/AbstractStore.java [163:181]
public String whichStateTopicQueueBelongTo(byte[] key) {
for (String uniqueQueue : recover.keySet()) {
for (byte[] tempKeyByte : recover.getOrDefault(uniqueQueue, new HashSet<>())) {
if (Arrays.equals(tempKeyByte, key)) {
return uniqueQueue;
}
}
}
for (String uniqueQueue : calculating.keySet()) {
for (byte[] tempKeyByte : calculating.getOrDefault(uniqueQueue, new HashSet<>())) {
if (Arrays.equals(tempKeyByte, key)) {
return uniqueQueue;
}
}
}
return null;
}