public void loadState()

in core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java [216:252]


    public void loadState(Set<MessageQueue> addQueues) throws Throwable {
        if (addQueues == null || addQueues.size() == 0) {
            return;
        }

        final DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(StreamConfig.ROCKETMQ_STREAMS_STATE_CONSUMER_GROUP);
        consumer.setNamesrvAddr(properties.getProperty(MixAll.NAMESRV_ADDR_PROPERTY));
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setAutoCommit(false);
        consumer.start();

        Set<MessageQueue> stateTopicQueue = convertSourceTopicQueue2StateTopicQueue(addQueues);
        for (MessageQueue messageQueue : stateTopicQueue) {
            createStateTopic(messageQueue.getTopic(), messageQueue.getBrokerName().equals(Constant.STATIC_TOPIC_BROKER_NAME));
        }

        consumer.assign(stateTopicQueue);
        for (MessageQueue queue : stateTopicQueue) {
            consumer.seekToBegin(queue);
        }

        Future<?> future = this.executor.submit(() -> {
            try {
                pullToLast(consumer);
            } catch (Throwable e) {
                logger.error("pull to last error.", e);
                throw new RuntimeException(e);
            } finally {
                consumer.shutdown();
            }
        });

        try {
            future.get(100, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | TimeoutException e) {
        }
    }