private void addSubscriptionAndInit()

in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java [286:311]


    private void addSubscriptionAndInit(Session session, Set<Subscription> subscriptions,
                                        CompletableFuture<Void> future) {
        if (session == null) {
            return;
        }
        if (subscriptions == null) {
            return;
        }
        session.addSubscription(subscriptions);
        AtomicInteger result = new AtomicInteger(0);
        for (Subscription subscription : subscriptions) {
            queueFresh.freshQueue(session, subscription);
            Map<Queue, QueueOffset> queueOffsets = session.getQueueOffset(subscription);
            if (queueOffsets != null) {
                result.addAndGet(queueOffsets.size());
            }
        }
        for (Subscription subscription : subscriptions) {
            Map<Queue, QueueOffset> queueOffsets = session.getQueueOffset(subscription);
            if (queueOffsets != null) {
                for (Map.Entry<Queue, QueueOffset> entry : queueOffsets.entrySet()) {
                    initOffset(session, subscription, entry.getKey(), entry.getValue(), future, result);
                }
            }
        }
    }