in artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java [505:689]
private static ActiveMQMessage toAMQMessage(MessageReference reference,
ICoreMessage coreMessage,
WireFormat marshaller,
AMQConsumer consumer, UUID serverNodeUUID) throws IOException {
final ActiveMQMessage amqMsg;
final byte coreType = coreMessage.getType();
final Boolean compressProp = getObjectProperty(coreMessage, Boolean.class, OpenWireConstants.AMQ_MSG_COMPRESSED);
final boolean isCompressed = compressProp != null && compressProp;
final ActiveMQBuffer buffer = coreMessage.getDataBuffer();
buffer.resetReaderIndex();
final byte[] bytes = switch (coreType) {
case org.apache.activemq.artemis.api.core.Message.BYTES_TYPE -> {
amqMsg = new EagerActiveMQBytesMessage(0);
yield toAMQMessageBytesType(buffer, isCompressed);
}
case org.apache.activemq.artemis.api.core.Message.MAP_TYPE -> {
amqMsg = new ActiveMQMapMessage();
yield toAMQMessageMapType(buffer, isCompressed);
}
case org.apache.activemq.artemis.api.core.Message.OBJECT_TYPE -> {
amqMsg = new ActiveMQObjectMessage();
yield toAMQMessageObjectType(buffer, isCompressed);
}
case org.apache.activemq.artemis.api.core.Message.STREAM_TYPE -> {
amqMsg = new ActiveMQStreamMessage();
yield toAMQMessageStreamType(buffer, isCompressed);
}
case org.apache.activemq.artemis.api.core.Message.TEXT_TYPE -> {
amqMsg = new ActiveMQTextMessage();
yield toAMQMessageTextType(buffer, isCompressed);
}
case org.apache.activemq.artemis.api.core.Message.DEFAULT_TYPE -> {
amqMsg = new ActiveMQMessage();
yield toAMQMessageDefaultType(buffer, isCompressed);
}
default -> throw new IllegalStateException("Unknown message type: " + coreMessage.getType());
};
final String type = getObjectProperty(coreMessage, String.class, OpenWireConstants.JMS_TYPE_PROPERTY);
if (type != null) {
amqMsg.setJMSType(type);
}
amqMsg.setPersistent(coreMessage.isDurable());
amqMsg.setExpiration(coreMessage.getExpiration());
amqMsg.setPriority(coreMessage.getPriority());
amqMsg.setTimestamp(coreMessage.getTimestamp());
Long brokerInTime = getObjectProperty(coreMessage, Long.class, OpenWireConstants.AMQ_MSG_BROKER_IN_TIME);
if (brokerInTime == null) {
brokerInTime = 0L;
}
amqMsg.setBrokerInTime(brokerInTime);
amqMsg.setCompressed(isCompressed);
//we need check null because messages may come from other clients
//and those amq specific attribute may not be set.
Long arrival = getObjectProperty(coreMessage, Long.class, OpenWireConstants.AMQ_MSG_ARRIVAL);
if (arrival == null) {
//messages from other sources (like core client) may not set this prop
arrival = 0L;
}
amqMsg.setArrival(arrival);
final SimpleString brokerPath = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.AMQ_MSG_BROKER_PATH);
if (brokerPath != null && !brokerPath.isEmpty()) {
setAMQMsgBrokerPath(amqMsg, brokerPath.toString());
}
final SimpleString clusterPath = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.AMQ_MSG_CLUSTER);
if (clusterPath != null && !clusterPath.isEmpty()) {
setAMQMsgClusterPath(amqMsg, clusterPath.toString());
}
Integer commandId = getObjectProperty(coreMessage, Integer.class, OpenWireConstants.AMQ_MSG_COMMAND_ID);
if (commandId == null) {
commandId = -1;
}
amqMsg.setCommandId(commandId);
final Object correlationID = coreMessage.getCorrelationID();
if (correlationID instanceof String || correlationID instanceof SimpleString) {
amqMsg.setCorrelationId(correlationID.toString());
} else if (correlationID instanceof byte[] correlationIdBytes) {
try {
amqMsg.setCorrelationId(StandardCharsets.UTF_8.newDecoder().decode(ByteBuffer.wrap(correlationIdBytes)).toString());
} catch (MalformedInputException e) {
ActiveMQServerLogger.LOGGER.unableToDecodeCorrelationId(e.getMessage());
}
}
final byte[] dsBytes = getObjectProperty(coreMessage, byte[].class, OpenWireConstants.AMQ_MSG_DATASTRUCTURE);
if (dsBytes != null) {
setAMQMsgDataStructure(amqMsg, marshaller, dsBytes);
}
final ActiveMQDestination actualDestination = consumer.getOpenwireDestination();
amqMsg.setDestination(OpenWireUtil.toAMQAddress(coreMessage, actualDestination));
final Object value = coreMessage.getGroupID();
if (value != null) {
String groupId = value.toString();
amqMsg.setGroupID(groupId);
}
amqMsg.setGroupSequence(coreMessage.getGroupSequence());
final Object messageIdValue = getObjectProperty(coreMessage, Object.class, OpenWireConstants.AMQ_MSG_MESSAGE_ID);
final MessageId messageId;
if (messageIdValue instanceof SimpleString) {
messageId = new MessageId(messageIdValue.toString());
} else if (messageIdValue instanceof byte[] messageIdBytes) {
ByteSequence midSeq = new ByteSequence(messageIdBytes);
messageId = (MessageId) marshaller.unmarshal(midSeq);
} else {
// ARTEMIS-3776 due to AMQ-6431 some older clients will not be able to receive messages
// if using a failover schema due to the messageID overFlowing Integer.MAX_VALUE
String midd = "ID:" + serverNodeUUID + ":-1:-1:" + (coreMessage.getMessageID() / Integer.MAX_VALUE);
messageId = new MessageId(midd, coreMessage.getMessageID() % Integer.MAX_VALUE);
}
amqMsg.setMessageId(messageId);
final Object origDestValue = getObjectProperty(coreMessage, Object.class, OpenWireConstants.AMQ_MSG_ORIG_DESTINATION);
if (origDestValue instanceof SimpleString) {
amqMsg.setOriginalDestination(ActiveMQDestination.createDestination(origDestValue.toString(), QUEUE_TYPE));
} else if (origDestValue instanceof byte[] origDestValueBytes) {
ActiveMQDestination origDest = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(origDestValueBytes));
amqMsg.setOriginalDestination(origDest);
}
final Object producerIdValue = getObjectProperty(coreMessage, Object.class, OpenWireConstants.AMQ_MSG_PRODUCER_ID);
if (producerIdValue instanceof SimpleString simpleString && !simpleString.isEmpty()) {
amqMsg.setProducerId(new ProducerId(producerIdValue.toString()));
} else if (producerIdValue instanceof byte[] producerIdBytes) {
ProducerId producerId = (ProducerId) marshaller.unmarshal(new ByteSequence(producerIdBytes));
amqMsg.setProducerId(producerId);
}
amqMsg.setRedeliveryCounter(reference.getDeliveryCount() - 1);
final Object replyToValue = getObjectProperty(coreMessage, Object.class, OpenWireConstants.AMQ_MSG_REPLY_TO);
if (replyToValue instanceof SimpleString) {
amqMsg.setReplyTo(ActiveMQDestination.createDestination(replyToValue.toString(), QUEUE_TYPE));
} else if (replyToValue instanceof byte[] replyToBytes) {
ActiveMQDestination replyTo = (ActiveMQDestination) marshaller.unmarshal(new ByteSequence(replyToBytes));
amqMsg.setReplyTo(replyTo);
}
final SimpleString userId = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.AMQ_MSG_USER_ID);
if (userId != null && !userId.isEmpty()) {
amqMsg.setUserID(userId.toString());
}
final Boolean isDroppable = getObjectProperty(coreMessage, Boolean.class, OpenWireConstants.AMQ_MSG_DROPPABLE);
if (isDroppable != null) {
amqMsg.setDroppable(isDroppable);
}
final SimpleString dlqCause = getObjectProperty(coreMessage, SimpleString.class, OpenWireConstants.AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
if (dlqCause != null) {
setAMQMsgDlqDeliveryFailureCause(amqMsg, dlqCause);
}
final SimpleString lastValueProperty = coreMessage.getLastValueProperty();
if (lastValueProperty != null) {
setAMQMsgHdrLastValueName(amqMsg, lastValueProperty);
}
final Long ingressTimestamp = getObjectProperty(coreMessage, Long.class, HDR_INGRESS_TIMESTAMP);
if (ingressTimestamp != null) {
setAMQMsgHdrIngressTimestamp(amqMsg, ingressTimestamp);
}
final Set<SimpleString> props = coreMessage.getPropertyNames();
if (props != null) {
setAMQMsgObjectProperties(amqMsg, coreMessage, props);
}
if (bytes != null) {
ByteSequence content = new ByteSequence(bytes);
amqMsg.setContent(content);
}
return amqMsg;
}