private void registerConsumer()

in client/src/main/java/org/apache/qpid/client/AMQSession.java [3114:3185]


    private void registerConsumer(C consumer, boolean nowait) throws QpidException
    {
        AMQDestination amqd = consumer.getDestination();

        if (amqd.getDestSyntax() == DestSyntax.ADDR)
        {
            resolveAddress(amqd,true,consumer.isNoLocal());
        }
        else
        {
            if (_declareExchanges && !amqd.neverDeclare())
            {
                declareExchange(amqd, nowait);
            }

            if ((_declareQueues || amqd.isNameRequired()) && !amqd.neverDeclare())
            {
                declareQueue(amqd, consumer.isNoLocal(), nowait);
            }
            if (_bindQueues && !amqd.neverDeclare() && !amqd.isDefaultExchange())
            {
                if(!isBound(amqd.getExchangeName(), amqd.getAMQQueueName(), amqd.getRoutingKey()))
                {
                    bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(),
                            amqd instanceof AMQTopic ? consumer.getArguments() : null, amqd.getExchangeName(), amqd, nowait);
                }
            }

        }

        String queueName = amqd.getAMQQueueName();

        // store the consumer queue name
        consumer.setQueuename(queueName);

        // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
        if (!_immediatePrefetch)
        {
            // The dispatcher will be null if we have just created this session
            // so suspend the channel before we register our consumer so that we don't
            // start prefetching until a receive/mListener is set.
            if (_dispatcher == null)
            {
                if (!isSuspended())
                {
                    try
                    {
                        suspendChannel(true);
                        _logger.debug(
                                "Prefetching delayed existing messages will not flow until requested via receive*() or setML().");
                    }
                    catch (QpidException e)
                    {
                        _logger.info("Suspending channel threw an exception:", e);
                    }
                }
            }
        }
        else
        {
            _logger.debug("Immediately prefetching existing messages to new consumer.");
        }

        try
        {
            consumeFromQueue(consumer, queueName, nowait);
        }
        catch (FailoverException e)
        {
            throw new QpidException("Fail-over exception interrupted basic consume.", e);
        }
    }