in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java [361:412]
public void notifyPullMessage(Session session, Subscription subscription, Queue queue) {
if (session == null || session.isDestroyed()) {
return;
}
if (subscriptionPersistManager == null) {
subscriptionPersistManager = SpringUtils.getBean(SubscriptionPersistManager.class);
}
if (subscriptionPersistManager != null &&
!session.isClean() && !session.isLoaded()) {
if (session.isLoading()) {
return;
}
session.setLoading();
CompletableFuture<Set<Subscription>> future = subscriptionPersistManager.loadSubscriptions(session.getClientId());
future.whenComplete((subscriptions, throwable) -> {
if (throwable != null) {
logger.error("", throwable);
scheduler.schedule(() -> {
session.resetLoad();
notifyPullMessage(session, subscription, queue);
}, 3, TimeUnit.SECONDS);
return;
}
session.addSubscription(subscriptions);
matchAction.addSubscription(session, subscriptions);
session.setLoaded();
notifyPullMessage(session, subscription, queue);
});
return;
}
if (queue != null) {
if (subscription == null) {
throw new RuntimeException(
"invalid notifyPullMessage, subscription is null, but queue is not null," + session.getClientId());
}
queueFresh.freshQueue(session, subscription);
pullMessage(session, subscription, queue);
return;
}
for (Subscription each : session.subscriptionSnapshot()) {
if (subscription != null && !each.equals(subscription)) {
continue;
}
queueFresh.freshQueue(session, each);
Map<Queue, QueueOffset> queueOffsets = session.getQueueOffset(each);
if (queueOffsets != null) {
for (Map.Entry<Queue, QueueOffset> entry : queueOffsets.entrySet()) {
pullMessage(session, each, entry.getKey());
}
}
}
}