private void pullToLast()

in core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java [291:322]


    private void pullToLast(DefaultLitePullConsumer consumer) throws Throwable {
        Set<MessageQueue> readyToRecover = consumer.assignment();
        for (MessageQueue messageQueue : readyToRecover) {
            this.recoveringQueueMutex.computeIfAbsent(messageQueue, messageQueue1 -> new CountDownLatch2(1));
        }

        List<MessageExt> holder = new ArrayList<>();
        //recover
        List<MessageExt> result = consumer.poll(50);
        while (result != null && result.size() != 0) {
            holder.addAll(result);
            if (holder.size() <= 1000) {
                result = consumer.poll(50);
                continue;
            }

            replayState(holder);
            holder.clear();

            result = consumer.poll(50);
        }
        if (holder.size() != 0) {
            replayState(holder);
        }

        //恢复完毕;
        Set<MessageQueue> recoverOver = consumer.assignment();
        for (MessageQueue messageQueue : recoverOver) {
            CountDownLatch2 waitPoint = this.recoveringQueueMutex.get(messageQueue);
            waitPoint.countDown();
        }
    }