void sendToQueue()

in artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java [182:296]


   void sendToQueue(MqttPublishMessage message, boolean internal) throws Exception {
      synchronized (lock) {
         if (createProducer) {
            session.getServerSession().addProducer(senderName, MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME, ServerProducer.ANONYMOUS);
            createProducer = false;
         }
         String topic = message.variableHeader().topicName();
         if (session.getVersion() == MQTTVersion.MQTT_5) {
            Integer alias = MQTTUtil.getProperty(Integer.class, message.variableHeader().properties(), TOPIC_ALIAS);
            if (alias != null) {
               Integer topicAliasMax = session.getProtocolManager().getTopicAliasMaximum();
               if (alias == 0) {
                  // [MQTT-3.3.2-8]
                  throw new DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
               } else if (topicAliasMax != null && alias > topicAliasMax) {
                  // [MQTT-3.3.2-9]
                  throw new DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
               }

               String existingTopicMapping = session.getState().getClientTopicAlias(alias);
               if (existingTopicMapping == null) {
                  if (topic == null || topic.isEmpty()) {
                     // using a topic alias with no matching topic in the state; potentially [MQTT-3.3.2-7]
                     throw new DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
                  }
                  logger.debug("Adding new alias {} for topic {}", alias, topic);
                  session.getState().putClientTopicAlias(alias, topic);
               } else if (topic != null && !topic.isEmpty()) {
                  logger.debug("Modifying existing alias {}. New value: {}; old value: {}", alias, topic, existingTopicMapping);
                  session.getState().putClientTopicAlias(alias, topic);
               } else {
                  logger.debug("Applying topic {} for alias {}", existingTopicMapping, alias);
                  topic = existingTopicMapping;
               }
            }
         }
         String coreAddress = MQTTUtil.getCoreAddressFromMqttTopic(topic, session.getWildcardConfiguration());
         SimpleString address = SimpleString.of(coreAddress, session.getCoreMessageObjectPools().getAddressStringSimpleStringPool());
         Message serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, address, message);
         int qos = message.fixedHeader().qosLevel().value();
         if (qos > 0) {
            serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES);
         }
         int packetId = message.variableHeader().packetId();
         boolean qos2PublishAlreadyReceived = state.getPubRec().contains(packetId);
         if (qos < 2 || !qos2PublishAlreadyReceived) {
            if (qos == 2 && !internal)
               state.getPubRec().add(packetId);

            Transaction tx = session.getServerSession().newTransaction();
            try {
               AddressInfo addressInfo = session.getServer().getAddressInfo(address);
               if (addressInfo == null && session.getServer().getAddressSettingsRepository().getMatch(coreAddress).isAutoCreateAddresses()) {
                  session.getServerSession().createAddress(address, RoutingType.MULTICAST, true);
                  serverMessage.setRoutingType(RoutingType.MULTICAST);
               }
               if (addressInfo != null) {
                  serverMessage.setRoutingType(addressInfo.getRoutingType());
               }
               session.getServerSession().send(tx, serverMessage, true, senderName, false);

               if (message.fixedHeader().isRetain()) {
                  ByteBuf payload = message.payload();
                  boolean reset = payload instanceof EmptyByteBuf || payload.capacity() == 0;
                  session.getRetainMessageManager().handleRetainedMessage(serverMessage, topic, reset, tx);
               }
               tx.commit();
            } catch (ActiveMQSecurityException e) {
               tx.rollback();
               if (internal) {
                  throw e;
               }
               if (session.getVersion() == MQTTVersion.MQTT_5) {
                  sendMessageAck(internal, qos, packetId, MQTTReasonCodes.NOT_AUTHORIZED);
                  return;
               } else if (session.getVersion() == MQTTVersion.MQTT_3_1_1) {
                  /*
                   * For MQTT 3.1.1 clients:
                   *
                   * [MQTT-3.3.5-2] If a Server implementation does not authorize a PUBLISH to be performed by a Client;
                   * it has no way of informing that Client. It MUST either make a positive acknowledgement, according
                   * to the normal QoS rules, or close the Network Connection
                   *
                   * Throwing an exception here will ultimately close the connection. This is the default behavior.
                   */
                  if (closeMqttConnectionOnPublishAuthorizationFailure) {
                     throw e;
                  } else {
                     logger.debug("MQTT 3.1.1 client not authorized to publish message.");
                  }
               } else {
                  /*
                   * For MQTT 3.1 clients:
                   *
                   * Note that if a server implementation does not authorize a PUBLISH to be made by a client, it has no
                   * way of informing that client. It must therefore make a positive acknowledgement, according to the
                   * normal QoS rules, and the client will *not* be informed that it was not authorized to publish the
                   * message.
                   *
                   * Log the failure since we have to just swallow it.
                   */
                  logger.debug("MQTT 3.1 client not authorized to publish message.");
               }
            } catch (Throwable t) {
               MQTTLogger.LOGGER.failedToPublishMqttMessage(t.getMessage(), t);
               tx.rollback();
               throw t;
            }
         } else if (qos2PublishAlreadyReceived) {
            MQTTLogger.LOGGER.ignoringQoS2Publish(state.getClientId(), packetId);
         }

         createMessageAck(packetId, qos, internal);
      }
   }