in broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java [2926:3147]
public void receiveQueueDeclare(final AMQShortString queueStr,
final boolean passive,
final boolean durable,
final boolean exclusive,
final boolean autoDelete,
final boolean nowait,
final FieldTable arguments)
{
if(LOGGER.isDebugEnabled())
{
LOGGER.debug("RECV[" + _channelId + "] QueueDeclare[" +" queue: " + queueStr +
" passive: " + passive +
" durable: " + durable +
" exclusive: " + exclusive +
" autoDelete: " + autoDelete + " nowait: " + nowait + " arguments: " + arguments + " ]");
}
NamedAddressSpace virtualHost = _connection.getAddressSpace();
final AMQShortString queueName;
// if we aren't given a queue name, we create one which we return to the client
if ((queueStr == null) || (queueStr.length() == 0))
{
queueName = AMQShortString.createAMQShortString("tmp_" + UUID.randomUUID());
}
else
{
queueName = queueStr;
}
Queue<?> queue;
//TODO: do we need to check that the queue already exists with exactly the same "configuration"?
if (passive)
{
queue = getQueue(queueName.toString());
if (queue == null)
{
closeChannel(ErrorCodes.NOT_FOUND,
"Queue: '"
+ queueName
+ "' not found on VirtualHost '"
+ virtualHost.getName()
+ "'.");
}
else
{
if (!queue.verifySessionAccess(this))
{
_connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED, "Queue '"
+ queue.getName()
+ "' is exclusive, but not created on this Connection.", getChannelId());
}
else
{
//set this as the default queue on the channel:
setDefaultQueue(queue);
if (!nowait)
{
sync();
MethodRegistry methodRegistry = _connection.getMethodRegistry();
QueueDeclareOkBody responseBody =
methodRegistry.createQueueDeclareOkBody(queueName,
queue.getQueueDepthMessages(),
queue.getConsumerCount());
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Queue " + queueName + " declared successfully");
}
}
}
}
}
else
{
try
{
final String queueNameString = AMQShortString.toString(queueName);
Map<String, Object> wireArguments = FieldTable.convertToMap(arguments);
Object alternateExchange = wireArguments.get(ALTERNATE_EXCHANGE);
if (alternateExchange != null)
{
String alternateExchangeName = String.valueOf(alternateExchange);
validateAlternateExchangeIsNotQueue(virtualHost, alternateExchangeName);
}
Queue.BehaviourOnUnknownDeclareArgument unknownArgumentBehaviour =
getConnection().getContextValue(Queue.BehaviourOnUnknownDeclareArgument.class,
Queue.UNKNOWN_QUEUE_DECLARE_ARGUMENT_BEHAVIOUR_NAME);
Map<String, Object> attributes =
QueueArgumentsConverter.convertWireArgsToModel(queueNameString, wireArguments, getModel(), unknownArgumentBehaviour);
attributes.put(Queue.NAME, queueNameString);
attributes.put(Queue.DURABLE, durable);
LifetimePolicy lifetimePolicy;
ExclusivityPolicy exclusivityPolicy;
if (exclusive)
{
lifetimePolicy = autoDelete
? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS
: durable ? LifetimePolicy.PERMANENT : LifetimePolicy.DELETE_ON_CONNECTION_CLOSE;
exclusivityPolicy = durable ? ExclusivityPolicy.CONTAINER : ExclusivityPolicy.CONNECTION;
}
else
{
lifetimePolicy = autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS : LifetimePolicy.PERMANENT;
exclusivityPolicy = ExclusivityPolicy.NONE;
}
if(!attributes.containsKey(Queue.EXCLUSIVE))
{
attributes.put(Queue.EXCLUSIVE, exclusivityPolicy);
}
if(!attributes.containsKey(Queue.LIFETIME_POLICY))
{
attributes.put(Queue.LIFETIME_POLICY, lifetimePolicy);
}
queue = virtualHost.createMessageSource(Queue.class, attributes);
setDefaultQueue(queue);
if (!nowait)
{
sync();
MethodRegistry methodRegistry = _connection.getMethodRegistry();
QueueDeclareOkBody responseBody =
methodRegistry.createQueueDeclareOkBody(queueName,
queue.getQueueDepthMessages(),
queue.getConsumerCount());
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Queue " + queueName + " declared successfully");
}
}
}
catch (AbstractConfiguredObject.DuplicateNameException qe)
{
queue = (Queue<?>) qe.getExisting();
if (!queue.verifySessionAccess(this))
{
_connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED, "Queue '"
+ queue.getName()
+ "' is exclusive, but not created on this Connection.", getChannelId());
}
else if (queue.isExclusive() != exclusive)
{
closeChannel(ErrorCodes.ALREADY_EXISTS,
"Cannot re-declare queue '"
+ queue.getName()
+ "' with different exclusivity (was: "
+ queue.isExclusive()
+ " requested "
+ exclusive
+ ")");
}
else if ((autoDelete
&& queue.getLifetimePolicy() == LifetimePolicy.PERMANENT)
|| (!autoDelete && queue.getLifetimePolicy() != ((exclusive
&& !durable)
? LifetimePolicy.DELETE_ON_CONNECTION_CLOSE
: LifetimePolicy.PERMANENT)))
{
closeChannel(ErrorCodes.ALREADY_EXISTS,
"Cannot re-declare queue '"
+ queue.getName()
+ "' with different lifetime policy (was: "
+ queue.getLifetimePolicy()
+ " requested autodelete: "
+ autoDelete
+ ")");
}
else
{
setDefaultQueue(queue);
if (!nowait)
{
sync();
MethodRegistry methodRegistry = _connection.getMethodRegistry();
QueueDeclareOkBody responseBody =
methodRegistry.createQueueDeclareOkBody(queueName,
queue.getQueueDepthMessages(),
queue.getConsumerCount());
_connection.writeFrame(responseBody.generateFrame(getChannelId()));
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Queue " + queueName + " declared successfully");
}
}
}
}
catch (AccessControlException e)
{
_connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, e.getMessage(), getChannelId());
}
catch (UnknownAlternateBindingException e)
{
final String message = String.format("Unknown alternate destination: '%s'", e.getAlternateBindingName());
_connection.sendConnectionClose(ErrorCodes.NOT_FOUND, message, getChannelId());
}
catch (IllegalArgumentException | IllegalConfigurationException e)
{
String message = String.format("Error creating queue '%s': %s", queueName, e.getMessage());
_connection.sendConnectionClose(ErrorCodes.INVALID_ARGUMENT, message, getChannelId());
}
}
}