in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/RetryDriver.java [133:171]
private void saveRetryQueue(String key, RetryMessage retryMessage) {
Message message = retryMessage.message.copy();
message.setFirstTopic(lmqQueueStore.getClientRetryTopic());
Session session = retryMessage.session;
int mqttMsgId = retryMessage.mqttMsgId;
String clientId = session.getClientId();
if (message.getRetry() >= connectConf.getMaxRetryTime()) {
pushAction.rollNext(session, retryMessage.mqttMsgId);
return;
}
String retryQueue = Subscription.newRetrySubscription(clientId).toQueueName();
CompletableFuture<StoreResult> result = lmqQueueStore.putMessage(new HashSet<>(Arrays.asList(retryQueue)), message);
result.whenComplete((storeResult, throwable) -> {
if (throwable != null) {
retryCache.put(key, retryMessage);
return;
}
long queueId = storeResult.getQueue().getQueueId();
String brokerName = storeResult.getQueue().getBrokerName();
pushAction.rollNext(session, mqttMsgId);
scheduler.schedule(() -> {
Subscription subscription = Subscription.newRetrySubscription(clientId);
List<Session> sessionList = sessionLoop.getSessionList(clientId);
if (sessionList != null) {
for (Session eachSession : sessionList) {
Set<Queue> set = queueFresh.freshQueue(eachSession, subscription);
if (set == null || set.isEmpty()) {
continue;
}
for (Queue queue : set) {
if (Objects.equals(queue.getBrokerName(), brokerName)) {
sessionLoop.notifyPullMessage(eachSession, subscription, queue);
}
}
}
}
}, scheduleDelaySecs, TimeUnit.SECONDS);
});
}