in client/src/main/java/org/apache/qpid/client/AMQSession.java [2111:2199]
protected C createConsumerImpl(final Destination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal,
final boolean exclusive, String selector, final Map<String,Object> rawSelector,
final boolean noConsume, final boolean autoClose) throws JMSException
{
checkTemporaryDestination(destination);
if(!noConsume && isBrowseOnlyDestination(destination))
{
throw new InvalidDestinationException("The consumer being created is not 'noConsume'," +
"but a 'browseOnly' Destination has been supplied.");
}
final String messageSelector;
if (_strictAMQP && !((selector == null) || selector.equals("")))
{
if (_strictAMQPFATAL)
{
throw new UnsupportedOperationException("Selectors not currently supported by AMQP.");
}
else
{
messageSelector = null;
}
}
else
{
messageSelector = selector;
}
return new FailoverRetrySupport<C, JMSException>(
new FailoverProtectedOperation<C, JMSException>()
{
public C execute() throws JMSException, FailoverException
{
checkNotClosed();
AMQDestination amqd = (AMQDestination) destination;
C consumer;
try
{
consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
noLocal, exclusive, messageSelector, rawSelector, noConsume, autoClose);
}
catch(TransportException e)
{
throw toJMSException("Exception while creating consumer: " + e.getMessage(), e);
}
if (_messageListener != null)
{
consumer.setMessageListener(_messageListener);
}
try
{
registerConsumer(consumer, false);
}
catch (AMQInvalidArgumentException ise)
{
throw JMSExceptionHelper.chainJMSException(new InvalidSelectorException(ise.getMessage()),
ise);
}
catch (AMQInvalidRoutingKeyException e)
{
throw JMSExceptionHelper.chainJMSException(new InvalidDestinationException(
"Invalid routing key:"
+ amqd.getRoutingKey()
), e);
}
catch (QpidException e)
{
if (e instanceof AMQChannelClosedException)
{
close(-1, false);
}
throw toJMSException("Error registering consumer: " + e,e);
}
catch (TransportException e)
{
throw toJMSException("Exception while registering consumer:" + e.getMessage(), e);
}
return consumer;
}
}, _connection).execute();
}