in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java [286:311]
private void addSubscriptionAndInit(Session session, Set<Subscription> subscriptions,
CompletableFuture<Void> future) {
if (session == null) {
return;
}
if (subscriptions == null) {
return;
}
session.addSubscription(subscriptions);
AtomicInteger result = new AtomicInteger(0);
for (Subscription subscription : subscriptions) {
queueFresh.freshQueue(session, subscription);
Map<Queue, QueueOffset> queueOffsets = session.getQueueOffset(subscription);
if (queueOffsets != null) {
result.addAndGet(queueOffsets.size());
}
}
for (Subscription subscription : subscriptions) {
Map<Queue, QueueOffset> queueOffsets = session.getQueueOffset(subscription);
if (queueOffsets != null) {
for (Map.Entry<Queue, QueueOffset> entry : queueOffsets.entrySet()) {
initOffset(session, subscription, entry.getKey(), entry.getValue(), future, result);
}
}
}
}