public void removeState()

in core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java [254:289]


    public void removeState(Set<MessageQueue> removeQueues) throws Throwable {
        if (removeQueues == null || removeQueues.size() == 0) {
            return;
        }

        Future<?> future = this.executor.submit(() -> {
            try {
                if (removeQueues.size() == 0) {
                    return;
                }
                Set<MessageQueue> stateTopicQueue = convertSourceTopicQueue2StateTopicQueue(removeQueues);

                Map<String/*brokerName@topic@queueId*/, List<MessageQueue>> groupByUniqueQueue = stateTopicQueue.stream().parallel().collect(Collectors.groupingBy(this::buildKey));
                for (String stateUniqueQueue : groupByUniqueQueue.keySet()) {
                    Set<byte[]> stateTopicQueueKey = super.getAll(stateUniqueQueue);
                    for (byte[] key : stateTopicQueueKey) {
                        this.rocksDBStore.deleteByKey(key);
                    }
                    super.removeAll(stateUniqueQueue);
                }


                for (MessageQueue stateMessageQueue : stateTopicQueue) {
                    this.recoveringQueueMutex.remove(stateMessageQueue);
                }
            } catch (Throwable e) {
                logger.error("remove state error", e);
                throw new RuntimeException(e);
            }
        });

        try {
            future.get(100, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | TimeoutException e) {
        }
    }