in core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java [216:252]
public void loadState(Set<MessageQueue> addQueues) throws Throwable {
if (addQueues == null || addQueues.size() == 0) {
return;
}
final DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(StreamConfig.ROCKETMQ_STREAMS_STATE_CONSUMER_GROUP);
consumer.setNamesrvAddr(properties.getProperty(MixAll.NAMESRV_ADDR_PROPERTY));
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setAutoCommit(false);
consumer.start();
Set<MessageQueue> stateTopicQueue = convertSourceTopicQueue2StateTopicQueue(addQueues);
for (MessageQueue messageQueue : stateTopicQueue) {
createStateTopic(messageQueue.getTopic(), messageQueue.getBrokerName().equals(Constant.STATIC_TOPIC_BROKER_NAME));
}
consumer.assign(stateTopicQueue);
for (MessageQueue queue : stateTopicQueue) {
consumer.seekToBegin(queue);
}
Future<?> future = this.executor.submit(() -> {
try {
pullToLast(consumer);
} catch (Throwable e) {
logger.error("pull to last error.", e);
throw new RuntimeException(e);
} finally {
consumer.shutdown();
}
});
try {
future.get(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException e) {
}
}