in artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java [116:243]
public static ICoreMessage toCore(AMQPMessage message, CoreMessageObjectPools coreMessageObjectPools, Header header, MessageAnnotations annotations, Properties properties, ApplicationProperties applicationProperties, Section body, Footer footer) throws ConversionException {
final long messageId = message.getMessageID();
final Symbol contentType = properties != null ? properties.getContentType() : null;
final String contentTypeString = contentType != null ? contentType.toString() : null;
CoreMessageWrapper result;
if (body == null) {
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
result = createObjectMessage(messageId, coreMessageObjectPools);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, contentType) || isContentType(null, contentType)) {
result = createBytesMessage(messageId, coreMessageObjectPools);
} else {
Charset charset = getCharsetForTextualContent(contentTypeString);
if (charset != null) {
result = createTextMessage(messageId, coreMessageObjectPools);
} else {
result = createMessage(messageId, coreMessageObjectPools);
}
}
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_NULL);
} else if (body instanceof Data data) {
Binary payload = data.getValue();
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
result = createObjectMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} else if (isContentType(OCTET_STREAM_CONTENT_TYPE, contentType)) {
result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
} else {
Charset charset = getCharsetForTextualContent(contentTypeString);
if (StandardCharsets.UTF_8.equals(charset)) {
ByteBuffer buf = ByteBuffer.wrap(payload.getArray(), payload.getArrayOffset(), payload.getLength());
try {
CharBuffer chars = charset.newDecoder().decode(buf);
result = createTextMessage(messageId, String.valueOf(chars), coreMessageObjectPools);
} catch (CharacterCodingException e) {
result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
}
} else {
result = createBytesMessage(messageId, payload.getArray(), payload.getArrayOffset(), payload.getLength(), coreMessageObjectPools);
}
}
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_DATA);
} else if (body instanceof AmqpSequence amqpSequence) {
CoreStreamMessageWrapper m = createStreamMessage(messageId, coreMessageObjectPools);
for (Object item : amqpSequence.getValue()) {
m.writeObject(item);
}
result = m;
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_SEQUENCE);
} else if (body instanceof AmqpValue amqpValue) {
Object value = amqpValue.getValue();
if (value == null || value instanceof String) {
result = createTextMessage(messageId, (String) value, coreMessageObjectPools);
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, value == null ? AMQP_VALUE_NULL : AMQP_VALUE_STRING);
} else if (value instanceof Binary binary) {
if (isContentType(SERIALIZED_JAVA_OBJECT_CONTENT_TYPE.toString(), contentType)) {
result = createObjectMessage(messageId, binary, coreMessageObjectPools);
} else {
result = createBytesMessage(messageId, binary.getArray(), binary.getArrayOffset(), binary.getLength(), coreMessageObjectPools);
}
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_BINARY);
} else if (value instanceof List) {
CoreStreamMessageWrapper m = createStreamMessage(messageId, coreMessageObjectPools);
for (Object item : (List<Object>) value) {
m.writeObject(item);
}
result = m;
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_LIST);
} else if (value instanceof Map) {
result = createMapMessage(messageId, (Map<String, Object>) value, coreMessageObjectPools);
result.setShortProperty(JMS_AMQP_ORIGINAL_ENCODING, AMQP_VALUE_MAP);
} else {
ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
try {
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf));
TLSEncode.getEncoder().writeObject(body);
result = createBytesMessage(messageId, buf.array(), 0, buf.writerIndex(), coreMessageObjectPools);
} finally {
buf.release();
TLSEncode.getEncoder().setByteBuffer((WritableBuffer)null);
}
}
} else {
throw new RuntimeException("Unexpected body type: " + body.getClass());
}
// Initialize the JMS expiration with the value calculated during the scan of the AMQP message
// to avoid a different value for each conversion based on System.currentTimeMillis() and ttl.
result.setJMSExpiration(message.getExpiration());
processHeader(result, header);
processMessageAnnotations(result, annotations);
processApplicationProperties(result, applicationProperties);
processProperties(result, properties, annotations);
processFooter(result, footer);
processExtraProperties(result, message.getExtraProperties());
// If the JMS expiration has not yet been set...
if (header != null && result.getExpiration() == 0) {
// Then lets try to set it based on the message TTL.
long ttl = MESSAGE_DEFAULT_TIME_TO_LIVE;
if (header.getTtl() != null) {
ttl = header.getTtl().longValue();
}
if (ttl == 0) {
result.setJMSExpiration(0);
} else {
result.setJMSExpiration(System.currentTimeMillis() + ttl);
}
}
result.getInnerMessage().setDurable(message.isDurable());
result.getInnerMessage().setPriority(message.getPriority());
result.getInnerMessage().setAddress(message.getAddressSimpleString());
result.getInnerMessage().setRoutingType(message.getRoutingType());
result.encode();
return result.getInnerMessage();
}