in systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/AmqpManagementFacade.java [276:411]
public Object performOperationUsingAmqpManagement(final String name,
final String operation,
final Session session,
final String type,
final Map<String, Object> arguments)
throws JMSException
{
Destination replyToDestination;
Destination replyConsumerDestination;
if (_protocol == Protocol.AMQP_1_0)
{
replyToDestination = session.createTemporaryQueue();
replyConsumerDestination = replyToDestination;
}
else
{
replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION);
replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION);
}
final MessageConsumer consumer = session.createConsumer(replyConsumerDestination);
final MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));
final MapMessage opMessage = session.createMapMessage();
opMessage.setStringProperty("type", type);
opMessage.setStringProperty("operation", operation);
opMessage.setStringProperty("index", "object-path");
opMessage.setJMSReplyTo(replyToDestination);
opMessage.setStringProperty("key", name);
for (final Map.Entry<String, Object> argument : arguments.entrySet())
{
final Object value = argument.getValue();
if (value.getClass().isPrimitive() || value instanceof String)
{
opMessage.setObjectProperty(argument.getKey(), value);
}
else
{
final ObjectMapper objectMapper = new ObjectMapper();
String jsonifiedValue;
try
{
jsonifiedValue = objectMapper.writeValueAsString(value);
}
catch (JsonProcessingException e)
{
throw new IllegalArgumentException(String.format(
"Cannot convert the argument '%s' to JSON to meet JMS type restrictions",
argument.getKey()));
}
opMessage.setObjectProperty(argument.getKey(), jsonifiedValue);
}
}
producer.send(opMessage);
if (session.getTransacted())
{
session.commit();
}
final Message response = consumer.receive(5000);
try
{
final int statusCode = response.getIntProperty("statusCode");
if (statusCode < 200 || statusCode > 299)
{
throw new OperationUnsuccessfulException(response.getStringProperty("statusDescription"), statusCode);
}
if (response instanceof StreamMessage)
{
final StreamMessage bodyStream = (StreamMessage) response;
final List<Object> result = new ArrayList<>();
boolean done = false;
do
{
try
{
result.add(bodyStream.readObject());
}
catch (MessageEOFException mfe)
{
// Expected - end of stream
done = true;
}
}
while (!done);
return result;
}
else if (response instanceof MapMessage)
{
final MapMessage bodyMap = (MapMessage) response;
final Map<String, Object> result = new TreeMap<>();
final Enumeration mapNames = bodyMap.getMapNames();
while (mapNames.hasMoreElements())
{
final String key = (String) mapNames.nextElement();
result.put(key, bodyMap.getObject(key));
}
return result;
}
else if (response instanceof ObjectMessage)
{
return ((ObjectMessage) response).getObject();
}
else if (response instanceof BytesMessage)
{
final BytesMessage bytesMessage = (BytesMessage) response;
if (bytesMessage.getBodyLength() == 0)
{
return null;
}
else
{
final byte[] buf = new byte[(int) bytesMessage.getBodyLength()];
bytesMessage.readBytes(buf);
return buf;
}
}
throw new IllegalArgumentException(
"Cannot parse the results from a management operation. JMS response message : " + response);
}
finally
{
if (session.getTransacted())
{
session.commit();
}
consumer.close();
if(_protocol == Protocol.AMQP_1_0)
{
((TemporaryQueue)replyToDestination).delete();
}
}
}