in core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java [291:322]
private void pullToLast(DefaultLitePullConsumer consumer) throws Throwable {
Set<MessageQueue> readyToRecover = consumer.assignment();
for (MessageQueue messageQueue : readyToRecover) {
this.recoveringQueueMutex.computeIfAbsent(messageQueue, messageQueue1 -> new CountDownLatch2(1));
}
List<MessageExt> holder = new ArrayList<>();
//recover
List<MessageExt> result = consumer.poll(50);
while (result != null && result.size() != 0) {
holder.addAll(result);
if (holder.size() <= 1000) {
result = consumer.poll(50);
continue;
}
replayState(holder);
holder.clear();
result = consumer.poll(50);
}
if (holder.size() != 0) {
replayState(holder);
}
//恢复完毕;
Set<MessageQueue> recoverOver = consumer.assignment();
for (MessageQueue messageQueue : recoverOver) {
CountDownLatch2 waitPoint = this.recoveringQueueMutex.get(messageQueue);
waitPoint.countDown();
}
}