public void freshQueue()

in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java [176:218]


    public void freshQueue(Subscription subscription, Set<Queue> queues) {
        if (subscription == null) {
            throw new RuntimeException("subscription is null");
        }
        if (queues == null) {
            logger.warn("queues is null when freshQueue,{},{}", getClientId(), subscription);
            return;
        }
        if (!subscriptions.containsKey(subscription.getTopicFilter())) {
            return;
        }

        String queueName = subscription.toQueueName();
        if (!offsetMap.containsKey(queueName)) {
            offsetMap.putIfAbsent(queueName, new ConcurrentHashMap<>(16));
        }
        for (Queue memQueue : offsetMap.get(queueName).keySet()) {
            if (!queues.contains(memQueue)) {
                offsetMap.get(queueName).remove(memQueue);
            }
        }
        // init queueOffset
        for (Queue nowQueue : queues) {
            if (!offsetMap.get(queueName).containsKey(nowQueue)) {
                QueueOffset queueOffset = new QueueOffset();
                //if no offset  use init offset
                offsetMap.get(queueName).put(nowQueue, queueOffset);
                this.markPersistOffsetFlag(true);
            }
        }

        if (!sendingMessages.containsKey(subscription)) {
            sendingMessages.putIfAbsent(subscription, new ConcurrentHashMap<>(16));
        }
        for (Queue memQueue : sendingMessages.get(subscription).keySet()) {
            if (!queues.contains(memQueue)) {
                sendingMessages.get(subscription).remove(memQueue);
            }
        }
        if (queues.isEmpty()) {
            logger.warn("queues is empty when freshQueue,{},{}", getClientId(), subscription);
        }
    }