public void push()

in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java [83:118]


    public void push(Message message, Subscription subscription, Session session, Queue queue) {
        String clientId = session.getClientId();
        int mqttId = mqttMsgId.nextId(clientId);
        inFlyCache.getPendingDownCache().put(session.getChannelId(), mqttId, subscription, queue, message);
        try {
            if (session.isClean()) {
                if (message.getStoreTimestamp() > 0 &&
                    message.getStoreTimestamp() < session.getStartTime()) {
                    logger.warn("old msg:{},{},{},{}", session.getClientId(), message.getMsgId(),
                        message.getStoreTimestamp(), session.getStartTime());
                    rollNext(session, mqttId);
                    return;
                }
            }
        } catch (Exception e) {
            logger.error("", e);
        }

        //deal with message with empty payload
        String msgPayLoad = new String(message.getPayload());
        if (msgPayLoad.equals(MessageUtil.EMPTYSTRING) && message.isEmpty()) {
            message.setPayload("".getBytes());
        }

        int qos = subscription.getQos();
        if (subscription.isP2p() && message.qos() != null) {
            qos = message.qos();
        }
        if (qos == 0) {
            write(session, message, mqttId, 0, subscription);
            rollNextByAck(session, mqttId);
        } else {
            retryDriver.mountPublish(mqttId, message, subscription.getQos(), ChannelInfo.getId(session.getChannel()), subscription);
            write(session, message, mqttId, qos, subscription);
        }
    }