in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/RetryDriver.java [173:224]
private void doRetryCache() {
try {
for (Map.Entry<String, RetryMessage> entry : retryCache.asMap().entrySet()) {
try {
RetryMessage retryMessage = entry.getValue();
Message message = retryMessage.message;
Session session = retryMessage.session;
int mqttMsgId = retryMessage.mqttMsgId;
if (System.currentTimeMillis() - retryMessage.timestamp < messageRetryInterval) {
continue;
}
if (MqttMessageType.PUBLISH.equals(retryMessage.mqttMessageType)) {
if (session == null || session.isDestroyed()) {
cleanRetryMessage(mqttMsgId, session.getChannelId());
continue;
}
if (retryMessage.mountTimeout()) {
saveRetryQueue(entry.getKey(), retryMessage);
cleanRetryMessage(mqttMsgId, session.getChannelId());
continue;
}
pushAction.write(session, message, mqttMsgId, retryMessage.qos, retryMessage.subscription);
retryMessage.timestamp = System.currentTimeMillis();
retryMessage.localRetryTime++;
} else if (MqttMessageType.PUBREL.equals(retryMessage.mqttMessageType)) {
if (session == null || session.isDestroyed() || retryMessage.mountRelTimeout()) {
retryCache.invalidate(entry.getKey());
logger.error("failed to retry pub rel more times,{},{}", session.getClientId(), mqttMsgId);
pushAction.rollNextByAck(session, mqttMsgId);
continue;
}
MqttFixedHeader pubRelMqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false,
MqttQoS.valueOf(retryMessage.qos), false, 0);
MqttMessage pubRelMqttMessage = new MqttMessage(pubRelMqttFixedHeader,
MqttMessageIdVariableHeader.from(mqttMsgId));
session.getChannel().writeAndFlush(pubRelMqttMessage);
retryMessage.localRetryTime++;
retryMessage.timestamp = System.currentTimeMillis();
logger.warn("retryPubRel:{},{}", session.getClientId(), mqttMsgId);
} else {
logger.error("error retry message type:{}", retryMessage.mqttMessageType);
}
} catch (Exception e) {
logger.error("", e);
}
}
} catch (Exception e) {
logger.error("", e);
}
}