in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/Session.java [176:218]
public void freshQueue(Subscription subscription, Set<Queue> queues) {
if (subscription == null) {
throw new RuntimeException("subscription is null");
}
if (queues == null) {
logger.warn("queues is null when freshQueue,{},{}", getClientId(), subscription);
return;
}
if (!subscriptions.containsKey(subscription.getTopicFilter())) {
return;
}
String queueName = subscription.toQueueName();
if (!offsetMap.containsKey(queueName)) {
offsetMap.putIfAbsent(queueName, new ConcurrentHashMap<>(16));
}
for (Queue memQueue : offsetMap.get(queueName).keySet()) {
if (!queues.contains(memQueue)) {
offsetMap.get(queueName).remove(memQueue);
}
}
// init queueOffset
for (Queue nowQueue : queues) {
if (!offsetMap.get(queueName).containsKey(nowQueue)) {
QueueOffset queueOffset = new QueueOffset();
//if no offset use init offset
offsetMap.get(queueName).put(nowQueue, queueOffset);
this.markPersistOffsetFlag(true);
}
}
if (!sendingMessages.containsKey(subscription)) {
sendingMessages.putIfAbsent(subscription, new ConcurrentHashMap<>(16));
}
for (Queue memQueue : sendingMessages.get(subscription).keySet()) {
if (!queues.contains(memQueue)) {
sendingMessages.get(subscription).remove(memQueue);
}
}
if (queues.isEmpty()) {
logger.warn("queues is empty when freshQueue,{},{}", getClientId(), subscription);
}
}