protected C createConsumerImpl()

in client/src/main/java/org/apache/qpid/client/AMQSession.java [2111:2199]


    protected C createConsumerImpl(final Destination destination, final int prefetchHigh,
                                   final int prefetchLow, final boolean noLocal,
                                   final boolean exclusive, String selector, final Map<String,Object> rawSelector,
                                   final boolean noConsume, final boolean autoClose) throws JMSException
    {
        checkTemporaryDestination(destination);

        if(!noConsume && isBrowseOnlyDestination(destination))
        {
            throw new InvalidDestinationException("The consumer being created is not 'noConsume'," +
                                                  "but a 'browseOnly' Destination has been supplied.");
        }

        final String messageSelector;

        if (_strictAMQP && !((selector == null) || selector.equals("")))
        {
            if (_strictAMQPFATAL)
            {
                throw new UnsupportedOperationException("Selectors not currently supported by AMQP.");
            }
            else
            {
                messageSelector = null;
            }
        }
        else
        {
            messageSelector = selector;
        }

        return new FailoverRetrySupport<C, JMSException>(
                new FailoverProtectedOperation<C, JMSException>()
                {
                    public C execute() throws JMSException, FailoverException
                    {
                        checkNotClosed();

                        AMQDestination amqd = (AMQDestination) destination;

                        C consumer;
                        try
                        {
                            consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
                                                             noLocal, exclusive, messageSelector, rawSelector, noConsume, autoClose);
                        }
                        catch(TransportException e)
                        {
                            throw toJMSException("Exception while creating consumer: " + e.getMessage(), e);
                        }

                        if (_messageListener != null)
                        {
                            consumer.setMessageListener(_messageListener);
                        }

                        try
                        {
                            registerConsumer(consumer, false);
                        }
                        catch (AMQInvalidArgumentException ise)
                        {
                            throw JMSExceptionHelper.chainJMSException(new InvalidSelectorException(ise.getMessage()),
                                                                       ise);
                        }
                        catch (AMQInvalidRoutingKeyException e)
                        {
                            throw JMSExceptionHelper.chainJMSException(new InvalidDestinationException(
                                    "Invalid routing key:"
                                    + amqd.getRoutingKey()
                            ), e);
                        }
                        catch (QpidException e)
                        {
                            if (e instanceof AMQChannelClosedException)
                            {
                                close(-1, false);
                            }

                            throw toJMSException("Error registering consumer: " + e,e);
                        }
                        catch (TransportException e)
                        {
                            throw toJMSException("Exception while registering consumer:" + e.getMessage(), e);
                        }
                        return consumer;
                    }
                }, _connection).execute();
    }