public Object performOperationUsingAmqpManagement()

in systests/src/main/java/org/apache/qpid/systest/core/brokerj/AmqpManagementFacade.java [248:349]


    public Object performOperationUsingAmqpManagement(final String name,
                                               final String type,
                                               final String operation,
                                               final Map<String, Object> arguments,
                                               final Session session)
            throws JMSException
    {
        Destination replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION);
        Destination replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION);

        MessageConsumer consumer = session.createConsumer(replyConsumerDestination);

        MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));

        MapMessage opMessage = session.createMapMessage();
        opMessage.setStringProperty("type", type);
        opMessage.setStringProperty("operation", operation);
        opMessage.setStringProperty("index", "object-path");
        opMessage.setJMSReplyTo(replyToDestination);

        opMessage.setStringProperty("key", name);
        for (Map.Entry<String, Object> argument : arguments.entrySet())
        {
            Object value = argument.getValue();
            if (value.getClass().isPrimitive() || value instanceof String)
            {
                opMessage.setObjectProperty(argument.getKey(), value);
            }
            else
            {
                ObjectMapper objectMapper = new ObjectMapper();
                String jsonifiedValue;
                try
                {
                    jsonifiedValue = objectMapper.writeValueAsString(value);
                }
                catch (JsonProcessingException e)
                {
                    throw new IllegalArgumentException(String.format(
                            "Cannot convert the argument '%s' to JSON to meet JMS type restrictions",
                            argument.getKey()));
                }
                opMessage.setObjectProperty(argument.getKey(), jsonifiedValue);
            }
        }

        producer.send(opMessage);
        if (session.getTransacted())
        {
            session.commit();
        }

        Message response = consumer.receive(getManagementResponseTimeout());
        try
        {
            int statusCode = response.getIntProperty("statusCode");
            if (statusCode < 200 || statusCode > 299)
            {
                throw new OperationUnsuccessfulException(response.getStringProperty("statusDescription"), statusCode);
            }
            if (response instanceof MapMessage)
            {
                MapMessage bodyMap = (MapMessage) response;
                Map<String, Object> result = new TreeMap<>();
                Enumeration mapNames = bodyMap.getMapNames();
                while (mapNames.hasMoreElements())
                {
                    String key = (String) mapNames.nextElement();
                    result.put(key, bodyMap.getObject(key));
                }
                return result;
            }
            else if (response instanceof ObjectMessage)
            {
                return ((ObjectMessage) response).getObject();
            }
            else if (response instanceof BytesMessage)
            {
                BytesMessage bytesMessage = (BytesMessage) response;
                if (bytesMessage.getBodyLength() == 0)
                {
                    return null;
                }
                else
                {
                    byte[] buf = new byte[(int) bytesMessage.getBodyLength()];
                    bytesMessage.readBytes(buf);
                    return buf;
                }
            }
            throw new IllegalArgumentException(
                    "Cannot parse the results from a management operation.  JMS response message : " + response);
        }
        finally
        {
            if (session.getTransacted())
            {
                session.commit();
            }
            consumer.close();
        }
    }