in systests/src/main/java/org/apache/qpid/systest/core/brokerj/AmqpManagementFacade.java [248:349]
public Object performOperationUsingAmqpManagement(final String name,
final String type,
final String operation,
final Map<String, Object> arguments,
final Session session)
throws JMSException
{
Destination replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION);
Destination replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION);
MessageConsumer consumer = session.createConsumer(replyConsumerDestination);
MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
MapMessage opMessage = session.createMapMessage();
opMessage.setStringProperty("type", type);
opMessage.setStringProperty("operation", operation);
opMessage.setStringProperty("index", "object-path");
opMessage.setJMSReplyTo(replyToDestination);
opMessage.setStringProperty("key", name);
for (Map.Entry<String, Object> argument : arguments.entrySet())
{
Object value = argument.getValue();
if (value.getClass().isPrimitive() || value instanceof String)
{
opMessage.setObjectProperty(argument.getKey(), value);
}
else
{
ObjectMapper objectMapper = new ObjectMapper();
String jsonifiedValue;
try
{
jsonifiedValue = objectMapper.writeValueAsString(value);
}
catch (JsonProcessingException e)
{
throw new IllegalArgumentException(String.format(
"Cannot convert the argument '%s' to JSON to meet JMS type restrictions",
argument.getKey()));
}
opMessage.setObjectProperty(argument.getKey(), jsonifiedValue);
}
}
producer.send(opMessage);
if (session.getTransacted())
{
session.commit();
}
Message response = consumer.receive(getManagementResponseTimeout());
try
{
int statusCode = response.getIntProperty("statusCode");
if (statusCode < 200 || statusCode > 299)
{
throw new OperationUnsuccessfulException(response.getStringProperty("statusDescription"), statusCode);
}
if (response instanceof MapMessage)
{
MapMessage bodyMap = (MapMessage) response;
Map<String, Object> result = new TreeMap<>();
Enumeration mapNames = bodyMap.getMapNames();
while (mapNames.hasMoreElements())
{
String key = (String) mapNames.nextElement();
result.put(key, bodyMap.getObject(key));
}
return result;
}
else if (response instanceof ObjectMessage)
{
return ((ObjectMessage) response).getObject();
}
else if (response instanceof BytesMessage)
{
BytesMessage bytesMessage = (BytesMessage) response;
if (bytesMessage.getBodyLength() == 0)
{
return null;
}
else
{
byte[] buf = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(buf);
return buf;
}
}
throw new IllegalArgumentException(
"Cannot parse the results from a management operation. JMS response message : " + response);
}
finally
{
if (session.getTransacted())
{
session.commit();
}
consumer.close();
}
}