in artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java [182:296]
void sendToQueue(MqttPublishMessage message, boolean internal) throws Exception {
synchronized (lock) {
if (createProducer) {
session.getServerSession().addProducer(senderName, MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME, ServerProducer.ANONYMOUS);
createProducer = false;
}
String topic = message.variableHeader().topicName();
if (session.getVersion() == MQTTVersion.MQTT_5) {
Integer alias = MQTTUtil.getProperty(Integer.class, message.variableHeader().properties(), TOPIC_ALIAS);
if (alias != null) {
Integer topicAliasMax = session.getProtocolManager().getTopicAliasMaximum();
if (alias == 0) {
// [MQTT-3.3.2-8]
throw new DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
} else if (topicAliasMax != null && alias > topicAliasMax) {
// [MQTT-3.3.2-9]
throw new DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
}
String existingTopicMapping = session.getState().getClientTopicAlias(alias);
if (existingTopicMapping == null) {
if (topic == null || topic.isEmpty()) {
// using a topic alias with no matching topic in the state; potentially [MQTT-3.3.2-7]
throw new DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
}
logger.debug("Adding new alias {} for topic {}", alias, topic);
session.getState().putClientTopicAlias(alias, topic);
} else if (topic != null && !topic.isEmpty()) {
logger.debug("Modifying existing alias {}. New value: {}; old value: {}", alias, topic, existingTopicMapping);
session.getState().putClientTopicAlias(alias, topic);
} else {
logger.debug("Applying topic {} for alias {}", existingTopicMapping, alias);
topic = existingTopicMapping;
}
}
}
String coreAddress = MQTTUtil.getCoreAddressFromMqttTopic(topic, session.getWildcardConfiguration());
SimpleString address = SimpleString.of(coreAddress, session.getCoreMessageObjectPools().getAddressStringSimpleStringPool());
Message serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, address, message);
int qos = message.fixedHeader().qosLevel().value();
if (qos > 0) {
serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES);
}
int packetId = message.variableHeader().packetId();
boolean qos2PublishAlreadyReceived = state.getPubRec().contains(packetId);
if (qos < 2 || !qos2PublishAlreadyReceived) {
if (qos == 2 && !internal)
state.getPubRec().add(packetId);
Transaction tx = session.getServerSession().newTransaction();
try {
AddressInfo addressInfo = session.getServer().getAddressInfo(address);
if (addressInfo == null && session.getServer().getAddressSettingsRepository().getMatch(coreAddress).isAutoCreateAddresses()) {
session.getServerSession().createAddress(address, RoutingType.MULTICAST, true);
serverMessage.setRoutingType(RoutingType.MULTICAST);
}
if (addressInfo != null) {
serverMessage.setRoutingType(addressInfo.getRoutingType());
}
session.getServerSession().send(tx, serverMessage, true, senderName, false);
if (message.fixedHeader().isRetain()) {
ByteBuf payload = message.payload();
boolean reset = payload instanceof EmptyByteBuf || payload.capacity() == 0;
session.getRetainMessageManager().handleRetainedMessage(serverMessage, topic, reset, tx);
}
tx.commit();
} catch (ActiveMQSecurityException e) {
tx.rollback();
if (internal) {
throw e;
}
if (session.getVersion() == MQTTVersion.MQTT_5) {
sendMessageAck(internal, qos, packetId, MQTTReasonCodes.NOT_AUTHORIZED);
return;
} else if (session.getVersion() == MQTTVersion.MQTT_3_1_1) {
/*
* For MQTT 3.1.1 clients:
*
* [MQTT-3.3.5-2] If a Server implementation does not authorize a PUBLISH to be performed by a Client;
* it has no way of informing that Client. It MUST either make a positive acknowledgement, according
* to the normal QoS rules, or close the Network Connection
*
* Throwing an exception here will ultimately close the connection. This is the default behavior.
*/
if (closeMqttConnectionOnPublishAuthorizationFailure) {
throw e;
} else {
logger.debug("MQTT 3.1.1 client not authorized to publish message.");
}
} else {
/*
* For MQTT 3.1 clients:
*
* Note that if a server implementation does not authorize a PUBLISH to be made by a client, it has no
* way of informing that client. It must therefore make a positive acknowledgement, according to the
* normal QoS rules, and the client will *not* be informed that it was not authorized to publish the
* message.
*
* Log the failure since we have to just swallow it.
*/
logger.debug("MQTT 3.1 client not authorized to publish message.");
}
} catch (Throwable t) {
MQTTLogger.LOGGER.failedToPublishMqttMessage(t.getMessage(), t);
tx.rollback();
throw t;
}
} else if (qos2PublishAlreadyReceived) {
MQTTLogger.LOGGER.ignoringQoS2Publish(state.getClientId(), packetId);
}
createMessageAck(packetId, qos, internal);
}
}