public void waitIfNotReady()

in core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java [90:108]


    public void waitIfNotReady(MessageQueue messageQueue) throws RecoverStateStoreThrowable {
        MessageQueue stateTopicQueue = convertSourceTopicQueue2StateTopicQueue(messageQueue);
        CountDownLatch2 waitPoint = this.recoveringQueueMutex.get(stateTopicQueue);

        long start = 0;
        long end = 0;
        try {
            start = System.currentTimeMillis();
            waitPoint.await(5000, TimeUnit.MILLISECONDS);
            end = System.currentTimeMillis();
        } catch (Throwable t) {
            throw new RecoverStateStoreThrowable(t);
        } finally {
            long cost = end - start;
            if (cost > 2000) {
                logger.error("recover finish, consume time:" + cost + " ms.");
            }
        }
    }