in broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java [1724:1862]
public void receiveBasicConsume(final AMQShortString queue,
final AMQShortString consumerTag,
final boolean noLocal,
final boolean noAck,
final boolean exclusive, final boolean nowait, final FieldTable arguments)
{
if(LOGGER.isDebugEnabled())
{
LOGGER.debug("RECV[" + _channelId + "] BasicConsume[" +" queue: " + queue +
" consumerTag: " + consumerTag +
" noLocal: " + noLocal +
" noAck: " + noAck +
" exclusive: " + exclusive + " nowait: " + nowait + " arguments: " + arguments + " ]");
}
AMQShortString consumerTag1 = consumerTag;
NamedAddressSpace vHost = _connection.getAddressSpace();
sync();
String queueName = AMQShortString.toString(queue);
MessageSource queue1 = queueName == null ? getDefaultQueue() : vHost.getAttainedMessageSource(queueName);
final Collection<MessageSource> sources = new HashSet<>();
if (arguments != null && arguments.get("x-multiqueue") instanceof Collection)
{
for (Object object : (Collection<Object>) arguments.get("x-multiqueue"))
{
String sourceName = String.valueOf(object);
sourceName = sourceName.trim();
if (sourceName.length() != 0)
{
MessageSource source = vHost.getAttainedMessageSource(sourceName);
if (source == null)
{
sources.clear();
break;
}
else
{
sources.add(source);
}
}
}
queueName = arguments.get("x-multiqueue").toString();
}
else if (queue1 != null)
{
sources.add(queue1);
}
if (sources.isEmpty())
{
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("No queue for '" + queueName + "'");
}
if (queueName != null)
{
closeChannel(ErrorCodes.NOT_FOUND, "No such queue, '" + queueName + "'");
}
else
{
_connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED,
"No queue name provided, no default queue defined.", _channelId);
}
}
else
{
try
{
consumerTag1 = consumeFromSource(consumerTag1,
sources,
!noAck,
arguments,
exclusive,
noLocal);
if (!nowait)
{
MethodRegistry methodRegistry = _connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag1);
_connection.writeFrame(responseBody.generateFrame(_channelId));
}
}
catch (ConsumerTagInUseException cte)
{
_connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED,
"Non-unique consumer tag, '" + consumerTag1
+ "'", _channelId);
}
catch (AMQInvalidArgumentException ise)
{
_connection.sendConnectionClose(ErrorCodes.ARGUMENT_INVALID, ise.getMessage(), _channelId);
}
catch (Queue.ExistingExclusiveConsumer e)
{
_connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED,
"Cannot subscribe to queue '"
+ queue1.getName()
+ "' as it already has an existing exclusive consumer", _channelId);
}
catch (Queue.ExistingConsumerPreventsExclusive e)
{
_connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED,
"Cannot subscribe to queue '"
+ queue1.getName()
+ "' exclusively as it already has a consumer", _channelId);
}
catch (AccessControlException e)
{
_connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, "Cannot subscribe to queue '"
+ queue1.getName()
+ "' permission denied", _channelId);
}
catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
{
_connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED,
"Cannot subscribe to queue '"
+ queue1.getName()
+ "' as it already has an incompatible exclusivity policy", _channelId);
}
catch (MessageSource.QueueDeleted queueDeleted)
{
_connection.sendConnectionClose(ErrorCodes.NOT_FOUND,
"Cannot subscribe to queue '"
+ queue1.getName()
+ "' as it has been deleted", _channelId);
}
}
}