in artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java [295:417]
public static void logMessage(MQTTSessionState state, MqttMessage message, boolean inbound, MQTTVersion version) {
if (logger.isTraceEnabled()) {
StringBuilder log = new StringBuilder("MQTT(");
if (state != null) {
log.append(state.getClientId());
}
if (inbound) {
log.append("): IN << ");
} else {
log.append("): OUT >> ");
}
if (message.fixedHeader() != null) {
log.append(message.fixedHeader().messageType().toString());
if (message.variableHeader() instanceof MqttMessageIdVariableHeader) {
log.append("(" + ((MqttMessageIdVariableHeader) message.variableHeader()).messageId() + ")");
}
switch (message.fixedHeader().messageType()) {
case PUBLISH:
MqttPublishVariableHeader publishHeader = (MqttPublishVariableHeader) message.variableHeader();
String topicName = publishHeader.topicName();
if (topicName == null || topicName.isEmpty()) {
topicName = "<empty>";
}
log.append("(" + publishHeader.packetId() + ")")
.append(" topic=" + topicName)
.append(", qos=" + message.fixedHeader().qosLevel().value())
.append(", retain=" + message.fixedHeader().isRetain())
.append(", dup=" + message.fixedHeader().isDup())
.append(", remainingLength=" + message.fixedHeader().remainingLength());
for (MqttProperties.MqttProperty property : ((MqttPublishMessage)message).variableHeader().properties().listAll()) {
Object value = property.value();
if (value != null) {
if (value instanceof byte[] bytes) {
value = new String(bytes, StandardCharsets.UTF_8);
} else if (value instanceof ArrayList<?> list && !list.isEmpty() && list.get(0) instanceof MqttProperties.StringPair) {
StringBuilder userProperties = new StringBuilder();
userProperties.append("[");
for (MqttProperties.StringPair pair : (ArrayList<MqttProperties.StringPair>) value) {
userProperties.append(pair.key).append(": ").append(pair.value).append(", ");
}
userProperties.delete(userProperties.length() - 2, userProperties.length());
userProperties.append("]");
value = userProperties.toString();
}
}
log.append(", " + formatCase(MqttPropertyType.valueOf(property.propertyId()).name()) + "=" + value);
}
log.append(", payload=" + getPayloadForLogging((MqttPublishMessage) message, 256));
break;
case CONNECT:
// intentionally omit the username & password from the log
MqttConnectVariableHeader connectHeader = (MqttConnectVariableHeader) message.variableHeader();
MqttConnectPayload payload = ((MqttConnectMessage)message).payload();
log.append(" protocol=(").append(connectHeader.name()).append(", ").append(connectHeader.version()).append(")")
.append(", hasPassword=").append(connectHeader.hasPassword())
.append(", isCleanStart=").append(connectHeader.isCleanSession())
.append(", keepAliveTimeSeconds=").append(connectHeader.keepAliveTimeSeconds())
.append(", clientIdentifier=").append(payload.clientIdentifier())
.append(", hasUserName=").append(connectHeader.hasUserName())
.append(", isWillFlag=").append(connectHeader.isWillFlag());
if (connectHeader.isWillFlag()) {
log.append(", willQos=").append(connectHeader.willQos())
.append(", isWillRetain=").append(connectHeader.isWillRetain())
.append(", willTopic=").append(payload.willTopic());
}
for (MqttProperties.MqttProperty property : connectHeader.properties().listAll()) {
log.append(", " + formatCase(MqttPropertyType.valueOf(property.propertyId()).name()) + "=" + property.value());
}
break;
case CONNACK:
MqttConnAckVariableHeader connackHeader = (MqttConnAckVariableHeader) message.variableHeader();
log.append(" connectReasonCode=").append(formatByte(connackHeader.connectReturnCode().byteValue()))
.append(", sessionPresent=").append(connackHeader.isSessionPresent());
for (MqttProperties.MqttProperty property : connackHeader.properties().listAll()) {
log.append(", " + formatCase(MqttPropertyType.valueOf(property.propertyId()).name()) + "=" + property.value());
}
break;
case SUBSCRIBE:
for (MqttTopicSubscription sub : ((MqttSubscribeMessage) message).payload().topicSubscriptions()) {
log.append("\n\ttopic: ").append(sub.topicName())
.append(", qos: ").append(sub.qualityOfService())
.append(", nolocal: ").append(sub.option().isNoLocal())
.append(", retainHandling: ").append(sub.option().retainHandling())
.append(", isRetainAsPublished: ").append(sub.option().isRetainAsPublished());
}
break;
case SUBACK:
for (Integer qos : ((MqttSubAckMessage) message).payload().grantedQoSLevels()) {
log.append("\n\t" + qos);
}
break;
case UNSUBSCRIBE:
for (String topic : ((MqttUnsubscribeMessage) message).payload().topics()) {
log.append("\n\t" + topic);
}
break;
case PUBACK:
break;
case PUBREC:
case PUBREL:
case PUBCOMP:
if (version == MQTTVersion.MQTT_5) {
MqttPubReplyMessageVariableHeader pubReplyVariableHeader = (MqttPubReplyMessageVariableHeader) message.variableHeader();
log.append(" reasonCode=").append(formatByte(pubReplyVariableHeader.reasonCode()));
}
break;
case DISCONNECT:
if (version == MQTTVersion.MQTT_5) {
MqttReasonCodeAndPropertiesVariableHeader disconnectVariableHeader = (MqttReasonCodeAndPropertiesVariableHeader) message.variableHeader();
log.append(" reasonCode=").append(formatByte(disconnectVariableHeader.reasonCode()));
}
break;
}
logger.trace(log.toString());
}
}
}