private Section convertBody()

in activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java [380:508]


    private Section convertBody(ActiveMQMessage message) throws JMSException {

        Section body = null;
        short orignalEncoding = AMQP_UNKNOWN;

        try {
            orignalEncoding = message.getShortProperty(JMS_AMQP_ORIGINAL_ENCODING);
        } catch (Exception ex) {
            // Ignore and stick with UNKNOWN
        }

        int messageType = message.getDataStructureType();

        if (messageType == CommandTypes.ACTIVEMQ_MESSAGE) {
        	Object data = message.getDataStructure();
            if (data instanceof ConnectionInfo) {
    			ConnectionInfo connectionInfo = (ConnectionInfo)data;
        		final HashMap<String, Object> connectionMap = new LinkedHashMap<String, Object>();
        		
        		connectionMap.put("ConnectionId", connectionInfo.getConnectionId().getValue());
        		connectionMap.put("ClientId", connectionInfo.getClientId());
        		connectionMap.put("ClientIp", connectionInfo.getClientIp());
        		connectionMap.put("UserName", connectionInfo.getUserName());
        		connectionMap.put("BrokerMasterConnector", connectionInfo.isBrokerMasterConnector());
        		connectionMap.put("Manageable", connectionInfo.isManageable());
        		connectionMap.put("ClientMaster", connectionInfo.isClientMaster());
        		connectionMap.put("FaultTolerant", connectionInfo.isFaultTolerant());
        		connectionMap.put("FailoverReconnect", connectionInfo.isFailoverReconnect());
        		
    			body = new AmqpValue(connectionMap);
            } else if (data instanceof RemoveInfo) {
    			RemoveInfo removeInfo = (RemoveInfo)message.getDataStructure();
        		final HashMap<String, Object> removeMap = new LinkedHashMap<String, Object>();
        		
            	if (removeInfo.isConnectionRemove()) {
            		removeMap.put(ConnectionId.class.getSimpleName(), ((ConnectionId)removeInfo.getObjectId()).getValue());
            	} else if (removeInfo.isConsumerRemove()) {
            		removeMap.put(ConsumerId.class.getSimpleName(), ((ConsumerId)removeInfo.getObjectId()).getValue());
            		removeMap.put("SessionId", ((ConsumerId)removeInfo.getObjectId()).getSessionId());
            		removeMap.put("ConnectionId", ((ConsumerId)removeInfo.getObjectId()).getConnectionId());
            		removeMap.put("ParentId", ((ConsumerId)removeInfo.getObjectId()).getParentId().getValue());
            	}
            	
            	body = new AmqpValue(removeMap);
            }
        } else if (messageType == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
            Binary payload = getBinaryFromMessageBody((ActiveMQBytesMessage) message);

            if (payload == null) {
                payload = EMPTY_BINARY;
            }

            switch (orignalEncoding) {
                case AMQP_NULL:
                    break;
                case AMQP_VALUE_BINARY:
                    body = new AmqpValue(payload);
                    break;
                case AMQP_DATA:
                case AMQP_UNKNOWN:
                default:
                    body = new Data(payload);
                    break;
            }
        } else if (messageType == CommandTypes.ACTIVEMQ_TEXT_MESSAGE) {
            switch (orignalEncoding) {
                case AMQP_NULL:
                    break;
                case AMQP_DATA:
                    body = new Data(getBinaryFromMessageBody((ActiveMQTextMessage) message));
                    break;
                case AMQP_VALUE_STRING:
                case AMQP_UNKNOWN:
                default:
                    body = new AmqpValue(((TextMessage) message).getText());
                    break;
            }
        } else if (messageType == CommandTypes.ACTIVEMQ_MAP_MESSAGE) {
            body = new AmqpValue(getMapFromMessageBody((ActiveMQMapMessage) message));
        } else if (messageType == CommandTypes.ACTIVEMQ_STREAM_MESSAGE) {
            ArrayList<Object> list = new ArrayList<>();
            final ActiveMQStreamMessage m = (ActiveMQStreamMessage) message;
            try {
                while (true) {
                    list.add(m.readObject());
                }
            } catch (MessageEOFException e) {
            }

            switch (orignalEncoding) {
                case AMQP_SEQUENCE:
                    body = new AmqpSequence(list);
                    break;
                case AMQP_VALUE_LIST:
                case AMQP_UNKNOWN:
                default:
                    body = new AmqpValue(list);
                    break;
            }
        } else if (messageType == CommandTypes.ACTIVEMQ_OBJECT_MESSAGE) {
            Binary payload = getBinaryFromMessageBody((ActiveMQObjectMessage) message);

            if (payload == null) {
                payload = EMPTY_BINARY;
            }

            switch (orignalEncoding) {
                case AMQP_VALUE_BINARY:
                    body = new AmqpValue(payload);
                    break;
                case AMQP_DATA:
                case AMQP_UNKNOWN:
                default:
                    body = new Data(payload);
                    break;
            }

            // For a non-AMQP message we tag the outbound content type as containing
            // a serialized Java object so that an AMQP client has a hint as to what
            // we are sending it.
            if (!message.propertyExists(JMS_AMQP_CONTENT_TYPE)) {
                message.setReadOnlyProperties(false);
                message.setStringProperty(JMS_AMQP_CONTENT_TYPE, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
                message.setReadOnlyProperties(true);
            }
        }

        return body;
    }