private void doRetryCache()

in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/RetryDriver.java [173:224]


    private void doRetryCache() {
        try {
            for (Map.Entry<String, RetryMessage> entry : retryCache.asMap().entrySet()) {
                try {
                    RetryMessage retryMessage = entry.getValue();
                    Message message = retryMessage.message;
                    Session session = retryMessage.session;
                    int mqttMsgId = retryMessage.mqttMsgId;

                    if (System.currentTimeMillis() - retryMessage.timestamp < messageRetryInterval) {
                        continue;
                    }

                    if (MqttMessageType.PUBLISH.equals(retryMessage.mqttMessageType)) {
                        if (session == null || session.isDestroyed()) {
                            cleanRetryMessage(mqttMsgId, session.getChannelId());
                            continue;
                        }
                        if (retryMessage.mountTimeout()) {
                            saveRetryQueue(entry.getKey(), retryMessage);
                            cleanRetryMessage(mqttMsgId, session.getChannelId());
                            continue;
                        }
                        pushAction.write(session, message, mqttMsgId, retryMessage.qos, retryMessage.subscription);
                        retryMessage.timestamp = System.currentTimeMillis();
                        retryMessage.localRetryTime++;
                    } else if (MqttMessageType.PUBREL.equals(retryMessage.mqttMessageType)) {
                        if (session == null || session.isDestroyed() || retryMessage.mountRelTimeout()) {
                            retryCache.invalidate(entry.getKey());
                            logger.error("failed to retry pub rel more times,{},{}", session.getClientId(), mqttMsgId);
                            pushAction.rollNextByAck(session, mqttMsgId);
                            continue;
                        }
                        MqttFixedHeader pubRelMqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false,
                                MqttQoS.valueOf(retryMessage.qos), false, 0);
                        MqttMessage pubRelMqttMessage = new MqttMessage(pubRelMqttFixedHeader,
                                MqttMessageIdVariableHeader.from(mqttMsgId));
                        session.getChannel().writeAndFlush(pubRelMqttMessage);
                        retryMessage.localRetryTime++;
                        retryMessage.timestamp = System.currentTimeMillis();
                        logger.warn("retryPubRel:{},{}", session.getClientId(), mqttMsgId);
                    } else {
                        logger.error("error retry message type:{}", retryMessage.mqttMessageType);
                    }
                } catch (Exception e) {
                    logger.error("", e);
                }
            }
        } catch (Exception e) {
            logger.error("", e);
        }
    }