in core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java [254:289]
public void removeState(Set<MessageQueue> removeQueues) throws Throwable {
if (removeQueues == null || removeQueues.size() == 0) {
return;
}
Future<?> future = this.executor.submit(() -> {
try {
if (removeQueues.size() == 0) {
return;
}
Set<MessageQueue> stateTopicQueue = convertSourceTopicQueue2StateTopicQueue(removeQueues);
Map<String/*brokerName@topic@queueId*/, List<MessageQueue>> groupByUniqueQueue = stateTopicQueue.stream().parallel().collect(Collectors.groupingBy(this::buildKey));
for (String stateUniqueQueue : groupByUniqueQueue.keySet()) {
Set<byte[]> stateTopicQueueKey = super.getAll(stateUniqueQueue);
for (byte[] key : stateTopicQueueKey) {
this.rocksDBStore.deleteByKey(key);
}
super.removeAll(stateUniqueQueue);
}
for (MessageQueue stateMessageQueue : stateTopicQueue) {
this.recoveringQueueMutex.remove(stateMessageQueue);
}
} catch (Throwable e) {
logger.error("remove state error", e);
throw new RuntimeException(e);
}
});
try {
future.get(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException e) {
}
}