public Object performOperationUsingAmqpManagement()

in systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/AmqpManagementFacade.java [276:411]


    public Object performOperationUsingAmqpManagement(final String name,
                                                      final String operation,
                                                      final Session session,
                                                      final String type,
                                                      final Map<String, Object> arguments)
            throws JMSException
    {
        Destination replyToDestination;
        Destination replyConsumerDestination;
        if (_protocol == Protocol.AMQP_1_0)
        {
            replyToDestination = session.createTemporaryQueue();
            replyConsumerDestination = replyToDestination;
        }
        else
        {
            replyToDestination = session.createQueue(AMQP_0_X_REPLY_TO_DESTINATION);
            replyConsumerDestination = session.createQueue(AMQP_0_X_CONSUMER_REPLY_DESTINATION);
        }

        final MessageConsumer consumer = session.createConsumer(replyConsumerDestination);

        final MessageProducer producer = session.createProducer(session.createQueue(_managementAddress));

        final MapMessage opMessage = session.createMapMessage();
        opMessage.setStringProperty("type", type);
        opMessage.setStringProperty("operation", operation);
        opMessage.setStringProperty("index", "object-path");
        opMessage.setJMSReplyTo(replyToDestination);

        opMessage.setStringProperty("key", name);
        for (final Map.Entry<String, Object> argument : arguments.entrySet())
        {
            final Object value = argument.getValue();
            if (value.getClass().isPrimitive() || value instanceof String)
            {
                opMessage.setObjectProperty(argument.getKey(), value);
            }
            else
            {
                final ObjectMapper objectMapper = new ObjectMapper();
                String jsonifiedValue;
                try
                {
                    jsonifiedValue = objectMapper.writeValueAsString(value);
                }
                catch (JsonProcessingException e)
                {
                    throw new IllegalArgumentException(String.format(
                            "Cannot convert the argument '%s' to JSON to meet JMS type restrictions",
                            argument.getKey()));
                }
                opMessage.setObjectProperty(argument.getKey(), jsonifiedValue);
            }
        }

        producer.send(opMessage);
        if (session.getTransacted())
        {
            session.commit();
        }

        final Message response = consumer.receive(5000);
        try
        {
            final int statusCode = response.getIntProperty("statusCode");
            if (statusCode < 200 || statusCode > 299)
            {
                throw new OperationUnsuccessfulException(response.getStringProperty("statusDescription"), statusCode);
            }
            if (response instanceof StreamMessage)
            {
                final StreamMessage bodyStream = (StreamMessage) response;
                final List<Object> result = new ArrayList<>();
                boolean done = false;
                do
                {
                    try
                    {
                        result.add(bodyStream.readObject());
                    }
                    catch (MessageEOFException mfe)
                    {
                        // Expected - end of stream
                        done = true;
                    }
                }
                while (!done);
                return result;
            }
            else if (response instanceof MapMessage)
            {
                final MapMessage bodyMap = (MapMessage) response;
                final Map<String, Object> result = new TreeMap<>();
                final Enumeration mapNames = bodyMap.getMapNames();
                while (mapNames.hasMoreElements())
                {
                    final String key = (String) mapNames.nextElement();
                    result.put(key, bodyMap.getObject(key));
                }
                return result;
            }
            else if (response instanceof ObjectMessage)
            {
                return ((ObjectMessage) response).getObject();
            }
            else if (response instanceof BytesMessage)
            {
                final BytesMessage bytesMessage = (BytesMessage) response;
                if (bytesMessage.getBodyLength() == 0)
                {
                    return null;
                }
                else
                {
                    final byte[] buf = new byte[(int) bytesMessage.getBodyLength()];
                    bytesMessage.readBytes(buf);
                    return buf;
                }
            }
            throw new IllegalArgumentException(
                    "Cannot parse the results from a management operation.  JMS response message : " + response);
        }
        finally
        {
            if (session.getTransacted())
            {
                session.commit();
            }
            consumer.close();
            if(_protocol == Protocol.AMQP_1_0)
            {
                ((TemporaryQueue)replyToDestination).delete();
            }
        }
    }