in core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java [90:108]
public void waitIfNotReady(MessageQueue messageQueue) throws RecoverStateStoreThrowable {
MessageQueue stateTopicQueue = convertSourceTopicQueue2StateTopicQueue(messageQueue);
CountDownLatch2 waitPoint = this.recoveringQueueMutex.get(stateTopicQueue);
long start = 0;
long end = 0;
try {
start = System.currentTimeMillis();
waitPoint.await(5000, TimeUnit.MILLISECONDS);
end = System.currentTimeMillis();
} catch (Throwable t) {
throw new RecoverStateStoreThrowable(t);
} finally {
long cost = end - start;
if (cost > 2000) {
logger.error("recover finish, consume time:" + cost + " ms.");
}
}
}