in activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java [123:378]
public EncodedMessage transform(ActiveMQMessage message) throws Exception {
if (message == null) {
return null;
}
long messageFormat = 0;
Header header = null;
Properties properties = null;
Map<Symbol, Object> daMap = null;
Map<Symbol, Object> maMap = null;
Map<String,Object> apMap = null;
Map<Object, Object> footerMap = null;
Section body = convertBody(message);
if (message.isPersistent()) {
if (header == null) {
header = new Header();
}
header.setDurable(true);
}
byte priority = message.getPriority();
if (priority != Message.DEFAULT_PRIORITY) {
if (header == null) {
header = new Header();
}
header.setPriority(UnsignedByte.valueOf(priority));
}
String type = message.getType();
if (type != null) {
if (properties == null) {
properties = new Properties();
}
properties.setSubject(type);
}
MessageId messageId = message.getMessageId();
if (messageId != null) {
if (properties == null) {
properties = new Properties();
}
properties.setMessageId(getOriginalMessageId(message));
}
ActiveMQDestination destination = message.getDestination();
if (destination != null) {
if (properties == null) {
properties = new Properties();
}
properties.setTo(destination.getQualifiedName());
if (maMap == null) {
maMap = new HashMap<>();
}
maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination));
}
ActiveMQDestination replyTo = message.getReplyTo();
if (replyTo != null) {
if (properties == null) {
properties = new Properties();
}
properties.setReplyTo(replyTo.getQualifiedName());
if (maMap == null) {
maMap = new HashMap<>();
}
maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo));
}
String correlationId = message.getCorrelationId();
if (correlationId != null) {
if (properties == null) {
properties = new Properties();
}
try {
properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
} catch (AmqpProtocolException e) {
properties.setCorrelationId(correlationId);
}
}
long expiration = message.getExpiration();
if (expiration != 0) {
long ttl = expiration - System.currentTimeMillis();
if (ttl < 0) {
ttl = 1;
}
if (header == null) {
header = new Header();
}
header.setTtl(new UnsignedInteger((int) ttl));
if (properties == null) {
properties = new Properties();
}
properties.setAbsoluteExpiryTime(new Date(expiration));
}
long timeStamp = message.getTimestamp();
if (timeStamp != 0) {
if (properties == null) {
properties = new Properties();
}
properties.setCreationTime(new Date(timeStamp));
}
// JMSX Message Properties
int deliveryCount = message.getRedeliveryCounter();
if (deliveryCount > 0) {
if (header == null) {
header = new Header();
}
header.setDeliveryCount(UnsignedInteger.valueOf(deliveryCount));
}
String userId = message.getUserID();
if (userId != null) {
if (properties == null) {
properties = new Properties();
}
properties.setUserId(new Binary(userId.getBytes(StandardCharsets.UTF_8)));
}
String groupId = message.getGroupID();
if (groupId != null) {
if (properties == null) {
properties = new Properties();
}
properties.setGroupId(groupId);
}
int groupSequence = message.getGroupSequence();
if (groupSequence > 0) {
if (properties == null) {
properties = new Properties();
}
properties.setGroupSequence(UnsignedInteger.valueOf(groupSequence));
}
final Map<String, Object> entries;
try {
entries = message.getProperties();
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
for (Map.Entry<String, Object> entry : entries.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (key.startsWith(JMS_AMQP_PREFIX)) {
if (key.startsWith(NATIVE, JMS_AMQP_PREFIX_LENGTH)) {
// skip transformer appended properties
continue;
} else if (key.startsWith(ORIGINAL_ENCODING, JMS_AMQP_PREFIX_LENGTH)) {
// skip transformer appended properties
continue;
} else if (key.startsWith(MESSAGE_FORMAT, JMS_AMQP_PREFIX_LENGTH)) {
messageFormat = (long) TypeConversionSupport.convert(entry.getValue(), Long.class);
continue;
} else if (key.startsWith(HEADER, JMS_AMQP_PREFIX_LENGTH)) {
if (header == null) {
header = new Header();
}
continue;
} else if (key.startsWith(PROPERTIES, JMS_AMQP_PREFIX_LENGTH)) {
if (properties == null) {
properties = new Properties();
}
continue;
} else if (key.startsWith(MESSAGE_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
if (maMap == null) {
maMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length());
maMap.put(Symbol.valueOf(name), value);
continue;
} else if (key.startsWith(FIRST_ACQUIRER, JMS_AMQP_PREFIX_LENGTH)) {
if (header == null) {
header = new Header();
}
header.setFirstAcquirer((boolean) TypeConversionSupport.convert(value, Boolean.class));
continue;
} else if (key.startsWith(CONTENT_TYPE, JMS_AMQP_PREFIX_LENGTH)) {
if (properties == null) {
properties = new Properties();
}
properties.setContentType(Symbol.getSymbol((String) TypeConversionSupport.convert(value, String.class)));
continue;
} else if (key.startsWith(CONTENT_ENCODING, JMS_AMQP_PREFIX_LENGTH)) {
if (properties == null) {
properties = new Properties();
}
properties.setContentEncoding(Symbol.getSymbol((String) TypeConversionSupport.convert(value, String.class)));
continue;
} else if (key.startsWith(REPLYTO_GROUP_ID, JMS_AMQP_PREFIX_LENGTH)) {
if (properties == null) {
properties = new Properties();
}
properties.setReplyToGroupId((String) TypeConversionSupport.convert(value, String.class));
continue;
} else if (key.startsWith(DELIVERY_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
if (daMap == null) {
daMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length());
daMap.put(Symbol.valueOf(name), value);
continue;
} else if (key.startsWith(FOOTER_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
if (footerMap == null) {
footerMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
footerMap.put(Symbol.valueOf(name), value);
continue;
}
} else if (key.startsWith(AMQ_SCHEDULED_MESSAGE_PREFIX )) {
// strip off the scheduled message properties
continue;
}
// The property didn't map into any other slot so we store it in the
// Application Properties section of the message.
if (apMap == null) {
apMap = new HashMap<>();
}
apMap.put(key, value);
int messageType = message.getDataStructureType();
if (messageType == CommandTypes.ACTIVEMQ_MESSAGE) {
// Type of command to recognize advisory message
Object data = message.getDataStructure();
if(data != null) {
apMap.put("ActiveMqDataStructureType", data.getClass().getSimpleName());
}
}
}
final AmqpWritableBuffer buffer = new AmqpWritableBuffer();
encoder.setByteBuffer(buffer);
if (header != null) {
encoder.writeObject(header);
}
if (daMap != null) {
encoder.writeObject(new DeliveryAnnotations(daMap));
}
if (maMap != null) {
encoder.writeObject(new MessageAnnotations(maMap));
}
if (properties != null) {
encoder.writeObject(properties);
}
if (apMap != null) {
encoder.writeObject(new ApplicationProperties(apMap));
}
if (body != null) {
encoder.writeObject(body);
}
if (footerMap != null) {
encoder.writeObject(new Footer(footerMap));
}
return new EncodedMessage(messageFormat, buffer.getArray(), 0, buffer.getArrayLength());
}