in client/src/main/java/org/apache/qpid/client/AMQSession.java [3114:3185]
private void registerConsumer(C consumer, boolean nowait) throws QpidException
{
AMQDestination amqd = consumer.getDestination();
if (amqd.getDestSyntax() == DestSyntax.ADDR)
{
resolveAddress(amqd,true,consumer.isNoLocal());
}
else
{
if (_declareExchanges && !amqd.neverDeclare())
{
declareExchange(amqd, nowait);
}
if ((_declareQueues || amqd.isNameRequired()) && !amqd.neverDeclare())
{
declareQueue(amqd, consumer.isNoLocal(), nowait);
}
if (_bindQueues && !amqd.neverDeclare() && !amqd.isDefaultExchange())
{
if(!isBound(amqd.getExchangeName(), amqd.getAMQQueueName(), amqd.getRoutingKey()))
{
bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(),
amqd instanceof AMQTopic ? consumer.getArguments() : null, amqd.getExchangeName(), amqd, nowait);
}
}
}
String queueName = amqd.getAMQQueueName();
// store the consumer queue name
consumer.setQueuename(queueName);
// If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
if (!_immediatePrefetch)
{
// The dispatcher will be null if we have just created this session
// so suspend the channel before we register our consumer so that we don't
// start prefetching until a receive/mListener is set.
if (_dispatcher == null)
{
if (!isSuspended())
{
try
{
suspendChannel(true);
_logger.debug(
"Prefetching delayed existing messages will not flow until requested via receive*() or setML().");
}
catch (QpidException e)
{
_logger.info("Suspending channel threw an exception:", e);
}
}
}
}
else
{
_logger.debug("Immediately prefetching existing messages to new consumer.");
}
try
{
consumeFromQueue(consumer, queueName, nowait);
}
catch (FailoverException e)
{
throw new QpidException("Fail-over exception interrupted basic consume.", e);
}
}