public EncodedMessage transform()

in activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java [123:378]


    public EncodedMessage transform(ActiveMQMessage message) throws Exception {
        if (message == null) {
            return null;
        }

        long messageFormat = 0;
        Header header = null;
        Properties properties = null;
        Map<Symbol, Object> daMap = null;
        Map<Symbol, Object> maMap = null;
        Map<String,Object> apMap = null;
        Map<Object, Object> footerMap = null;

        Section body = convertBody(message);

        if (message.isPersistent()) {
            if (header == null) {
                header = new Header();
            }
            header.setDurable(true);
        }
        byte priority = message.getPriority();
        if (priority != Message.DEFAULT_PRIORITY) {
            if (header == null) {
                header = new Header();
            }
            header.setPriority(UnsignedByte.valueOf(priority));
        }
        String type = message.getType();
        if (type != null) {
            if (properties == null) {
                properties = new Properties();
            }
            properties.setSubject(type);
        }
        MessageId messageId = message.getMessageId();
        if (messageId != null) {
            if (properties == null) {
                properties = new Properties();
            }
            properties.setMessageId(getOriginalMessageId(message));
        }
        ActiveMQDestination destination = message.getDestination();
        if (destination != null) {
            if (properties == null) {
                properties = new Properties();
            }
            properties.setTo(destination.getQualifiedName());
            if (maMap == null) {
                maMap = new HashMap<>();
            }
            maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination));
        }
        ActiveMQDestination replyTo = message.getReplyTo();
        if (replyTo != null) {
            if (properties == null) {
                properties = new Properties();
            }
            properties.setReplyTo(replyTo.getQualifiedName());
            if (maMap == null) {
                maMap = new HashMap<>();
            }
            maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo));
        }
        String correlationId = message.getCorrelationId();
        if (correlationId != null) {
            if (properties == null) {
                properties = new Properties();
            }
            try {
                properties.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
            } catch (AmqpProtocolException e) {
                properties.setCorrelationId(correlationId);
            }
        }
        long expiration = message.getExpiration();
        if (expiration != 0) {
            long ttl = expiration - System.currentTimeMillis();
            if (ttl < 0) {
                ttl = 1;
            }

            if (header == null) {
                header = new Header();
            }
            header.setTtl(new UnsignedInteger((int) ttl));

            if (properties == null) {
                properties = new Properties();
            }
            properties.setAbsoluteExpiryTime(new Date(expiration));
        }
        long timeStamp = message.getTimestamp();
        if (timeStamp != 0) {
            if (properties == null) {
                properties = new Properties();
            }
            properties.setCreationTime(new Date(timeStamp));
        }

        // JMSX Message Properties
        int deliveryCount = message.getRedeliveryCounter();
        if (deliveryCount > 0) {
            if (header == null) {
                header = new Header();
            }
            header.setDeliveryCount(UnsignedInteger.valueOf(deliveryCount));
        }
        String userId = message.getUserID();
        if (userId != null) {
            if (properties == null) {
                properties = new Properties();
            }
            properties.setUserId(new Binary(userId.getBytes(StandardCharsets.UTF_8)));
        }
        String groupId = message.getGroupID();
        if (groupId != null) {
            if (properties == null) {
                properties = new Properties();
            }
            properties.setGroupId(groupId);
        }
        int groupSequence = message.getGroupSequence();
        if (groupSequence > 0) {
            if (properties == null) {
                properties = new Properties();
            }
            properties.setGroupSequence(UnsignedInteger.valueOf(groupSequence));
        }

        final Map<String, Object> entries;
        try {
            entries = message.getProperties();
        } catch (IOException e) {
            throw JMSExceptionSupport.create(e);
        }

        for (Map.Entry<String, Object> entry : entries.entrySet()) {
            String key = entry.getKey();
            Object value = entry.getValue();

            if (key.startsWith(JMS_AMQP_PREFIX)) {
                if (key.startsWith(NATIVE, JMS_AMQP_PREFIX_LENGTH)) {
                    // skip transformer appended properties
                    continue;
                } else if (key.startsWith(ORIGINAL_ENCODING, JMS_AMQP_PREFIX_LENGTH)) {
                    // skip transformer appended properties
                    continue;
                } else if (key.startsWith(MESSAGE_FORMAT, JMS_AMQP_PREFIX_LENGTH)) {
                    messageFormat = (long) TypeConversionSupport.convert(entry.getValue(), Long.class);
                    continue;
                } else if (key.startsWith(HEADER, JMS_AMQP_PREFIX_LENGTH)) {
                    if (header == null) {
                        header = new Header();
                    }
                    continue;
                } else if (key.startsWith(PROPERTIES, JMS_AMQP_PREFIX_LENGTH)) {
                    if (properties == null) {
                        properties = new Properties();
                    }
                    continue;
                } else if (key.startsWith(MESSAGE_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
                    if (maMap == null) {
                        maMap = new HashMap<>();
                    }
                    String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length());
                    maMap.put(Symbol.valueOf(name), value);
                    continue;
                } else if (key.startsWith(FIRST_ACQUIRER, JMS_AMQP_PREFIX_LENGTH)) {
                    if (header == null) {
                        header = new Header();
                    }
                    header.setFirstAcquirer((boolean) TypeConversionSupport.convert(value, Boolean.class));
                    continue;
                } else if (key.startsWith(CONTENT_TYPE, JMS_AMQP_PREFIX_LENGTH)) {
                    if (properties == null) {
                        properties = new Properties();
                    }
                    properties.setContentType(Symbol.getSymbol((String) TypeConversionSupport.convert(value, String.class)));
                    continue;
                } else if (key.startsWith(CONTENT_ENCODING, JMS_AMQP_PREFIX_LENGTH)) {
                    if (properties == null) {
                        properties = new Properties();
                    }
                    properties.setContentEncoding(Symbol.getSymbol((String) TypeConversionSupport.convert(value, String.class)));
                    continue;
                } else if (key.startsWith(REPLYTO_GROUP_ID, JMS_AMQP_PREFIX_LENGTH)) {
                    if (properties == null) {
                        properties = new Properties();
                    }
                    properties.setReplyToGroupId((String) TypeConversionSupport.convert(value, String.class));
                    continue;
                } else if (key.startsWith(DELIVERY_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
                    if (daMap == null) {
                        daMap = new HashMap<>();
                    }
                    String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length());
                    daMap.put(Symbol.valueOf(name), value);
                    continue;
                } else if (key.startsWith(FOOTER_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
                    if (footerMap == null) {
                        footerMap = new HashMap<>();
                    }
                    String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
                    footerMap.put(Symbol.valueOf(name), value);
                    continue;
                }
            } else if (key.startsWith(AMQ_SCHEDULED_MESSAGE_PREFIX )) {
                // strip off the scheduled message properties
                continue;
            }

            // The property didn't map into any other slot so we store it in the
            // Application Properties section of the message.
            if (apMap == null) {
                apMap = new HashMap<>();
            }
            apMap.put(key, value);

            int messageType = message.getDataStructureType();
            if (messageType == CommandTypes.ACTIVEMQ_MESSAGE) {
                // Type of command to recognize advisory message
                Object data = message.getDataStructure();
                if(data != null) {
                    apMap.put("ActiveMqDataStructureType", data.getClass().getSimpleName());
                }
            }
        }

        final AmqpWritableBuffer buffer = new AmqpWritableBuffer();
        encoder.setByteBuffer(buffer);

        if (header != null) {
            encoder.writeObject(header);
        }
        if (daMap != null) {
            encoder.writeObject(new DeliveryAnnotations(daMap));
        }
        if (maMap != null) {
            encoder.writeObject(new MessageAnnotations(maMap));
        }
        if (properties != null) {
            encoder.writeObject(properties);
        }
        if (apMap != null) {
            encoder.writeObject(new ApplicationProperties(apMap));
        }
        if (body != null) {
            encoder.writeObject(body);
        }
        if (footerMap != null) {
            encoder.writeObject(new Footer(footerMap));
        }

        return new EncodedMessage(messageFormat, buffer.getArray(), 0, buffer.getArrayLength());
    }