public static void logMessage()

in artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java [295:417]


   public static void logMessage(MQTTSessionState state, MqttMessage message, boolean inbound, MQTTVersion version) {
      if (logger.isTraceEnabled()) {
         StringBuilder log = new StringBuilder("MQTT(");

         if (state != null) {
            log.append(state.getClientId());
         }

         if (inbound) {
            log.append("): IN << ");
         } else {
            log.append("): OUT >> ");
         }

         if (message.fixedHeader() != null) {
            log.append(message.fixedHeader().messageType().toString());

            if (message.variableHeader() instanceof MqttMessageIdVariableHeader) {
               log.append("(" + ((MqttMessageIdVariableHeader) message.variableHeader()).messageId() + ")");
            }

            switch (message.fixedHeader().messageType()) {
               case PUBLISH:
                  MqttPublishVariableHeader publishHeader = (MqttPublishVariableHeader) message.variableHeader();
                  String topicName = publishHeader.topicName();
                  if (topicName == null || topicName.isEmpty()) {
                     topicName = "<empty>";
                  }
                  log.append("(" + publishHeader.packetId() + ")")
                     .append(" topic=" + topicName)
                     .append(", qos=" + message.fixedHeader().qosLevel().value())
                     .append(", retain=" + message.fixedHeader().isRetain())
                     .append(", dup=" + message.fixedHeader().isDup())
                     .append(", remainingLength=" + message.fixedHeader().remainingLength());
                  for (MqttProperties.MqttProperty property : ((MqttPublishMessage)message).variableHeader().properties().listAll()) {
                     Object value = property.value();
                     if (value != null) {
                        if (value instanceof byte[] bytes) {
                           value = new String(bytes, StandardCharsets.UTF_8);
                        } else if (value instanceof ArrayList<?> list && !list.isEmpty() && list.get(0) instanceof MqttProperties.StringPair) {
                           StringBuilder userProperties = new StringBuilder();
                           userProperties.append("[");
                           for (MqttProperties.StringPair pair : (ArrayList<MqttProperties.StringPair>) value) {
                              userProperties.append(pair.key).append(": ").append(pair.value).append(", ");
                           }
                           userProperties.delete(userProperties.length() - 2, userProperties.length());
                           userProperties.append("]");
                           value = userProperties.toString();
                        }
                     }
                     log.append(", " + formatCase(MqttPropertyType.valueOf(property.propertyId()).name()) + "=" + value);
                  }
                  log.append(", payload=" + getPayloadForLogging((MqttPublishMessage) message, 256));
                  break;
               case CONNECT:
                  // intentionally omit the username & password from the log
                  MqttConnectVariableHeader connectHeader = (MqttConnectVariableHeader) message.variableHeader();
                  MqttConnectPayload payload = ((MqttConnectMessage)message).payload();
                  log.append(" protocol=(").append(connectHeader.name()).append(", ").append(connectHeader.version()).append(")")
                     .append(", hasPassword=").append(connectHeader.hasPassword())
                     .append(", isCleanStart=").append(connectHeader.isCleanSession())
                     .append(", keepAliveTimeSeconds=").append(connectHeader.keepAliveTimeSeconds())
                     .append(", clientIdentifier=").append(payload.clientIdentifier())
                     .append(", hasUserName=").append(connectHeader.hasUserName())
                     .append(", isWillFlag=").append(connectHeader.isWillFlag());
                  if (connectHeader.isWillFlag()) {
                     log.append(", willQos=").append(connectHeader.willQos())
                        .append(", isWillRetain=").append(connectHeader.isWillRetain())
                        .append(", willTopic=").append(payload.willTopic());
                  }
                  for (MqttProperties.MqttProperty property : connectHeader.properties().listAll()) {
                     log.append(", " + formatCase(MqttPropertyType.valueOf(property.propertyId()).name()) + "=" + property.value());
                  }
                  break;
               case CONNACK:
                  MqttConnAckVariableHeader connackHeader = (MqttConnAckVariableHeader) message.variableHeader();
                  log.append(" connectReasonCode=").append(formatByte(connackHeader.connectReturnCode().byteValue()))
                     .append(", sessionPresent=").append(connackHeader.isSessionPresent());
                  for (MqttProperties.MqttProperty property : connackHeader.properties().listAll()) {
                     log.append(", " + formatCase(MqttPropertyType.valueOf(property.propertyId()).name()) + "=" + property.value());
                  }
                  break;
               case SUBSCRIBE:
                  for (MqttTopicSubscription sub : ((MqttSubscribeMessage) message).payload().topicSubscriptions()) {
                     log.append("\n\ttopic: ").append(sub.topicName())
                        .append(", qos: ").append(sub.qualityOfService())
                        .append(", nolocal: ").append(sub.option().isNoLocal())
                        .append(", retainHandling: ").append(sub.option().retainHandling())
                        .append(", isRetainAsPublished: ").append(sub.option().isRetainAsPublished());
                  }
                  break;
               case SUBACK:
                  for (Integer qos : ((MqttSubAckMessage) message).payload().grantedQoSLevels()) {
                     log.append("\n\t" + qos);
                  }
                  break;
               case UNSUBSCRIBE:
                  for (String topic : ((MqttUnsubscribeMessage) message).payload().topics()) {
                     log.append("\n\t" + topic);
                  }
                  break;
               case PUBACK:
                  break;
               case PUBREC:
               case PUBREL:
               case PUBCOMP:
                  if (version == MQTTVersion.MQTT_5) {
                     MqttPubReplyMessageVariableHeader pubReplyVariableHeader = (MqttPubReplyMessageVariableHeader) message.variableHeader();
                     log.append(" reasonCode=").append(formatByte(pubReplyVariableHeader.reasonCode()));
                  }
                  break;
               case DISCONNECT:
                  if (version == MQTTVersion.MQTT_5) {
                     MqttReasonCodeAndPropertiesVariableHeader disconnectVariableHeader = (MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader();
                     log.append(" reasonCode=").append(formatByte(disconnectVariableHeader.reasonCode()));
                  }
                  break;
            }

            logger.trace(log.toString());
         }
      }
   }