in activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java [380:508]
private Section convertBody(ActiveMQMessage message) throws JMSException {
Section body = null;
short orignalEncoding = AMQP_UNKNOWN;
try {
orignalEncoding = message.getShortProperty(JMS_AMQP_ORIGINAL_ENCODING);
} catch (Exception ex) {
// Ignore and stick with UNKNOWN
}
int messageType = message.getDataStructureType();
if (messageType == CommandTypes.ACTIVEMQ_MESSAGE) {
Object data = message.getDataStructure();
if (data instanceof ConnectionInfo) {
ConnectionInfo connectionInfo = (ConnectionInfo)data;
final HashMap<String, Object> connectionMap = new LinkedHashMap<String, Object>();
connectionMap.put("ConnectionId", connectionInfo.getConnectionId().getValue());
connectionMap.put("ClientId", connectionInfo.getClientId());
connectionMap.put("ClientIp", connectionInfo.getClientIp());
connectionMap.put("UserName", connectionInfo.getUserName());
connectionMap.put("BrokerMasterConnector", connectionInfo.isBrokerMasterConnector());
connectionMap.put("Manageable", connectionInfo.isManageable());
connectionMap.put("ClientMaster", connectionInfo.isClientMaster());
connectionMap.put("FaultTolerant", connectionInfo.isFaultTolerant());
connectionMap.put("FailoverReconnect", connectionInfo.isFailoverReconnect());
body = new AmqpValue(connectionMap);
} else if (data instanceof RemoveInfo) {
RemoveInfo removeInfo = (RemoveInfo)message.getDataStructure();
final HashMap<String, Object> removeMap = new LinkedHashMap<String, Object>();
if (removeInfo.isConnectionRemove()) {
removeMap.put(ConnectionId.class.getSimpleName(), ((ConnectionId)removeInfo.getObjectId()).getValue());
} else if (removeInfo.isConsumerRemove()) {
removeMap.put(ConsumerId.class.getSimpleName(), ((ConsumerId)removeInfo.getObjectId()).getValue());
removeMap.put("SessionId", ((ConsumerId)removeInfo.getObjectId()).getSessionId());
removeMap.put("ConnectionId", ((ConsumerId)removeInfo.getObjectId()).getConnectionId());
removeMap.put("ParentId", ((ConsumerId)removeInfo.getObjectId()).getParentId().getValue());
}
body = new AmqpValue(removeMap);
}
} else if (messageType == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
Binary payload = getBinaryFromMessageBody((ActiveMQBytesMessage) message);
if (payload == null) {
payload = EMPTY_BINARY;
}
switch (orignalEncoding) {
case AMQP_NULL:
break;
case AMQP_VALUE_BINARY:
body = new AmqpValue(payload);
break;
case AMQP_DATA:
case AMQP_UNKNOWN:
default:
body = new Data(payload);
break;
}
} else if (messageType == CommandTypes.ACTIVEMQ_TEXT_MESSAGE) {
switch (orignalEncoding) {
case AMQP_NULL:
break;
case AMQP_DATA:
body = new Data(getBinaryFromMessageBody((ActiveMQTextMessage) message));
break;
case AMQP_VALUE_STRING:
case AMQP_UNKNOWN:
default:
body = new AmqpValue(((TextMessage) message).getText());
break;
}
} else if (messageType == CommandTypes.ACTIVEMQ_MAP_MESSAGE) {
body = new AmqpValue(getMapFromMessageBody((ActiveMQMapMessage) message));
} else if (messageType == CommandTypes.ACTIVEMQ_STREAM_MESSAGE) {
ArrayList<Object> list = new ArrayList<>();
final ActiveMQStreamMessage m = (ActiveMQStreamMessage) message;
try {
while (true) {
list.add(m.readObject());
}
} catch (MessageEOFException e) {
}
switch (orignalEncoding) {
case AMQP_SEQUENCE:
body = new AmqpSequence(list);
break;
case AMQP_VALUE_LIST:
case AMQP_UNKNOWN:
default:
body = new AmqpValue(list);
break;
}
} else if (messageType == CommandTypes.ACTIVEMQ_OBJECT_MESSAGE) {
Binary payload = getBinaryFromMessageBody((ActiveMQObjectMessage) message);
if (payload == null) {
payload = EMPTY_BINARY;
}
switch (orignalEncoding) {
case AMQP_VALUE_BINARY:
body = new AmqpValue(payload);
break;
case AMQP_DATA:
case AMQP_UNKNOWN:
default:
body = new Data(payload);
break;
}
// For a non-AMQP message we tag the outbound content type as containing
// a serialized Java object so that an AMQP client has a hint as to what
// we are sending it.
if (!message.propertyExists(JMS_AMQP_CONTENT_TYPE)) {
message.setReadOnlyProperties(false);
message.setStringProperty(JMS_AMQP_CONTENT_TYPE, SERIALIZED_JAVA_OBJECT_CONTENT_TYPE);
message.setReadOnlyProperties(true);
}
}
return body;
}