in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/session/infly/PushAction.java [83:118]
public void push(Message message, Subscription subscription, Session session, Queue queue) {
String clientId = session.getClientId();
int mqttId = mqttMsgId.nextId(clientId);
inFlyCache.getPendingDownCache().put(session.getChannelId(), mqttId, subscription, queue, message);
try {
if (session.isClean()) {
if (message.getStoreTimestamp() > 0 &&
message.getStoreTimestamp() < session.getStartTime()) {
logger.warn("old msg:{},{},{},{}", session.getClientId(), message.getMsgId(),
message.getStoreTimestamp(), session.getStartTime());
rollNext(session, mqttId);
return;
}
}
} catch (Exception e) {
logger.error("", e);
}
//deal with message with empty payload
String msgPayLoad = new String(message.getPayload());
if (msgPayLoad.equals(MessageUtil.EMPTYSTRING) && message.isEmpty()) {
message.setPayload("".getBytes());
}
int qos = subscription.getQos();
if (subscription.isP2p() && message.qos() != null) {
qos = message.qos();
}
if (qos == 0) {
write(session, message, mqttId, 0, subscription);
rollNextByAck(session, mqttId);
} else {
retryDriver.mountPublish(mqttId, message, subscription.getQos(), ChannelInfo.getId(session.getChannel()), subscription);
write(session, message, mqttId, qos, subscription);
}
}