in activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java [87:224]
protected void populateMessage(ActiveMQMessage jms, org.apache.qpid.proton.message.Message amqp) throws Exception {
Header header = amqp.getHeader();
if (header != null) {
jms.setBooleanProperty(JMS_AMQP_HEADER, true);
if (header.getDurable() != null) {
jms.setPersistent(header.getDurable());
} else {
jms.setPersistent(false);
}
if (header.getPriority() != null) {
jms.setJMSPriority(header.getPriority().intValue());
} else {
jms.setPriority((byte) Message.DEFAULT_PRIORITY);
}
if (header.getFirstAcquirer() != null) {
jms.setBooleanProperty(JMS_AMQP_FIRST_ACQUIRER, header.getFirstAcquirer());
}
if (header.getDeliveryCount() != null) {
jms.setRedeliveryCounter(header.getDeliveryCount().intValue());
}
} else {
jms.setPriority((byte) Message.DEFAULT_PRIORITY);
jms.setPersistent(false);
}
final MessageAnnotations ma = amqp.getMessageAnnotations();
if (ma != null) {
for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
String key = entry.getKey().toString();
if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) {
long deliveryTime = ((Number) entry.getValue()).longValue();
long delay = deliveryTime - System.currentTimeMillis();
if (delay > 0) {
jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
}
} else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) {
long delay = ((Number) entry.getValue()).longValue();
if (delay > 0) {
jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
}
} else if ("x-opt-delivery-repeat".equals(key) && entry.getValue() != null) {
int repeat = ((Number) entry.getValue()).intValue();
if (repeat > 0) {
jms.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
}
} else if ("x-opt-delivery-period".equals(key) && entry.getValue() != null) {
long period = ((Number) entry.getValue()).longValue();
if (period > 0) {
jms.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
}
} else if ("x-opt-delivery-cron".equals(key) && entry.getValue() != null) {
String cronEntry = (String) entry.getValue();
if (cronEntry != null) {
jms.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, cronEntry);
}
}
setProperty(jms, JMS_AMQP_MESSAGE_ANNOTATION_PREFIX + key, entry.getValue());
}
}
final ApplicationProperties ap = amqp.getApplicationProperties();
if (ap != null) {
for (Map.Entry<String, Object> entry : ((Map<String, Object>) ap.getValue()).entrySet()) {
setProperty(jms, entry.getKey(), entry.getValue());
}
}
final Properties properties = amqp.getProperties();
if (properties != null) {
jms.setBooleanProperty(JMS_AMQP_PROPERTIES, true);
if (properties.getMessageId() != null) {
jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId()));
}
Binary userId = properties.getUserId();
if (userId != null) {
jms.setUserID(new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), StandardCharsets.UTF_8));
}
if (properties.getTo() != null) {
jms.setDestination((ActiveMQDestination.createDestination(properties.getTo(), ActiveMQDestination.QUEUE_TYPE)));
}
if (properties.getSubject() != null) {
jms.setType(properties.getSubject());
}
if (properties.getReplyTo() != null) {
jms.setReplyTo((ActiveMQDestination.createDestination(properties.getReplyTo(), ActiveMQDestination.QUEUE_TYPE)));
}
if (properties.getCorrelationId() != null) {
jms.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId()));
}
if (properties.getContentType() != null) {
jms.setStringProperty(JMS_AMQP_CONTENT_TYPE, properties.getContentType().toString());
}
if (properties.getContentEncoding() != null) {
jms.setStringProperty(JMS_AMQP_CONTENT_ENCODING, properties.getContentEncoding().toString());
}
if (properties.getCreationTime() != null) {
jms.setTimestamp(properties.getCreationTime().getTime());
}
if (properties.getGroupId() != null) {
jms.setGroupID(properties.getGroupId());
}
if (properties.getGroupSequence() != null) {
jms.setGroupSequence(properties.getGroupSequence().intValue());
}
if (properties.getReplyToGroupId() != null) {
jms.setStringProperty(JMS_AMQP_REPLYTO_GROUP_ID, properties.getReplyToGroupId());
}
if (properties.getAbsoluteExpiryTime() != null) {
jms.setExpiration(properties.getAbsoluteExpiryTime().getTime());
}
}
// If the jms expiration has not yet been set...
if (header != null && jms.getJMSExpiration() == 0) {
// Then lets try to set it based on the message ttl.
long ttl = Message.DEFAULT_TIME_TO_LIVE;
if (header.getTtl() != null) {
ttl = header.getTtl().longValue();
}
if (ttl != jakarta.jms.Message.DEFAULT_TIME_TO_LIVE) {
jms.setExpiration(System.currentTimeMillis() + ttl);
}
}
final Footer fp = amqp.getFooter();
if (fp != null) {
for (Map.Entry<Object, Object> entry : (Set<Map.Entry<Object, Object>>) fp.getValue().entrySet()) {
String key = entry.getKey().toString();
setProperty(jms, JMS_AMQP_FOOTER_PREFIX + key, entry.getValue());
}
}
}