public void receiveBasicConsume()

in broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java [1724:1862]


    public void receiveBasicConsume(final AMQShortString queue,
                                    final AMQShortString consumerTag,
                                    final boolean noLocal,
                                    final boolean noAck,
                                    final boolean exclusive, final boolean nowait, final FieldTable arguments)
    {

        if(LOGGER.isDebugEnabled())
        {
            LOGGER.debug("RECV[" + _channelId + "] BasicConsume[" +" queue: " + queue +
                          " consumerTag: " + consumerTag +
                          " noLocal: " + noLocal +
                          " noAck: " + noAck +
                          " exclusive: " + exclusive + " nowait: " + nowait + " arguments: " + arguments + " ]");
        }

        AMQShortString consumerTag1 = consumerTag;
        NamedAddressSpace vHost = _connection.getAddressSpace();
        sync();
        String queueName = AMQShortString.toString(queue);

        MessageSource queue1 = queueName == null ? getDefaultQueue() : vHost.getAttainedMessageSource(queueName);
        final Collection<MessageSource> sources = new HashSet<>();

        if (arguments != null && arguments.get("x-multiqueue") instanceof Collection)
        {
            for (Object object : (Collection<Object>) arguments.get("x-multiqueue"))
            {
                String sourceName = String.valueOf(object);
                sourceName = sourceName.trim();
                if (sourceName.length() != 0)
                {
                    MessageSource source = vHost.getAttainedMessageSource(sourceName);
                    if (source == null)
                    {
                        sources.clear();
                        break;
                    }
                    else
                    {
                        sources.add(source);
                    }
                }
            }
            queueName = arguments.get("x-multiqueue").toString();
        }
        else if (queue1 != null)
        {
            sources.add(queue1);
        }


        if (sources.isEmpty())
        {
            if (LOGGER.isDebugEnabled())
            {
                LOGGER.debug("No queue for '" + queueName + "'");
            }
            if (queueName != null)
            {
                closeChannel(ErrorCodes.NOT_FOUND, "No such queue, '" + queueName + "'");
            }
            else
            {
                _connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED,
                                                "No queue name provided, no default queue defined.", _channelId);
            }
        }
        else
        {
            try
            {
                consumerTag1 = consumeFromSource(consumerTag1,
                                                 sources,
                                                 !noAck,
                                                 arguments,
                                                 exclusive,
                                                 noLocal);
                if (!nowait)
                {
                    MethodRegistry methodRegistry = _connection.getMethodRegistry();
                    AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag1);
                    _connection.writeFrame(responseBody.generateFrame(_channelId));

                }
            }
            catch (ConsumerTagInUseException cte)
            {

                _connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED,
                        "Non-unique consumer tag, '" + consumerTag1
                                + "'", _channelId);
            }
            catch (AMQInvalidArgumentException ise)
            {
                _connection.sendConnectionClose(ErrorCodes.ARGUMENT_INVALID, ise.getMessage(), _channelId);


            }
            catch (Queue.ExistingExclusiveConsumer e)
            {
                _connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED,
                        "Cannot subscribe to queue '"
                                + queue1.getName()
                                + "' as it already has an existing exclusive consumer", _channelId);

            }
            catch (Queue.ExistingConsumerPreventsExclusive e)
            {
                _connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED,
                        "Cannot subscribe to queue '"
                                + queue1.getName()
                                + "' exclusively as it already has a consumer", _channelId);

            }
            catch (AccessControlException e)
            {
                _connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, "Cannot subscribe to queue '"
                                                                           + queue1.getName()
                                                                           + "' permission denied", _channelId);

            }
            catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
            {
                _connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED,
                        "Cannot subscribe to queue '"
                                + queue1.getName()
                                + "' as it already has an incompatible exclusivity policy", _channelId);

            }
            catch (MessageSource.QueueDeleted queueDeleted)
            {
                _connection.sendConnectionClose(ErrorCodes.NOT_FOUND,
                                                "Cannot subscribe to queue '"
                                                + queue1.getName()
                                                + "' as it has been deleted", _channelId);
            }
        }
    }