in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/loop/SessionLoopImpl.java [161:194]
public void loadSession(String clientId, Channel channel) {
if (StringUtils.isBlank(clientId)) {
return;
}
if (!channel.isActive()) {
return;
}
String channelId = ChannelInfo.getId(channel);
if (sessionMap.containsKey(channelId)) {
return;
}
Session session = new Session();
session.setClientId(clientId);
session.setChannelId(channelId);
session.setChannel(channel);
addSubscriptionAndInit(session,
new HashSet<>(
Arrays.asList(Subscription.newP2pSubscription(clientId), Subscription.newRetrySubscription(clientId))),
ChannelInfo.getFuture(channel, ChannelInfo.FUTURE_CONNECT));
synchronized (this) {
sessionMap.put(channelId, session);
if (!clientMap.containsKey(clientId)) {
clientMap.putIfAbsent(clientId, new ConcurrentHashMap<>(2));
}
clientMap.get(clientId).put(channelId, session);
}
if (!channel.isActive()) {
unloadSession(clientId, channelId);
return;
}
if (!session.isClean()) {
notifyPullMessage(session, null, null);
}
}