in broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java [2530:2699]
public void receiveExchangeDeclare(final AMQShortString exchangeName,
final AMQShortString type,
final boolean passive,
final boolean durable,
final boolean autoDelete,
final boolean internal,
final boolean nowait,
final FieldTable arguments)
{
if(LOGGER.isDebugEnabled())
{
LOGGER.debug("RECV[" + _channelId + "] ExchangeDeclare[" +" exchange: " + exchangeName +
" type: " + type +
" passive: " + passive +
" durable: " + durable +
" autoDelete: " + autoDelete +
" internal: " + internal + " nowait: " + nowait + " arguments: " + arguments + " ]");
}
final MethodRegistry methodRegistry = _connection.getMethodRegistry();
final AMQMethodBody declareOkBody = methodRegistry.createExchangeDeclareOkBody();
Exchange<?> exchange;
NamedAddressSpace virtualHost = _connection.getAddressSpace();
if (isDefaultExchange(exchangeName))
{
if (!AMQShortString.createAMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(type))
{
_connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED, "Attempt to redeclare default exchange: "
+ " of type "
+ ExchangeDefaults.DIRECT_EXCHANGE_CLASS
+ " to " + type + ".", getChannelId());
}
else if (!nowait)
{
sync();
_connection.writeFrame(declareOkBody.generateFrame(getChannelId()));
}
}
else
{
if (passive)
{
exchange = getExchange(exchangeName.toString());
if (exchange == null)
{
closeChannel(ErrorCodes.NOT_FOUND, "Unknown exchange: '" + exchangeName + "'");
}
else if (!(type == null || type.length() == 0) && !exchange.getType().equals(type.toString()))
{
_connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED, "Attempt to redeclare exchange: '"
+ exchangeName
+ "' of type "
+ exchange.getType()
+ " to "
+ type
+ ".", getChannelId());
}
else if (!nowait)
{
sync();
_connection.writeFrame(declareOkBody.generateFrame(getChannelId()));
}
}
else
{
String name = exchangeName.toString();
String typeString = type == null ? null : type.toString();
try
{
Map<String, Object> attributes = new HashMap<>();
if (arguments != null)
{
attributes.putAll(FieldTable.convertToMap(arguments));
}
attributes.put(Exchange.NAME, name);
attributes.put(Exchange.TYPE, typeString);
attributes.put(Exchange.DURABLE, durable);
attributes.put(Exchange.LIFETIME_POLICY,
autoDelete ? LifetimePolicy.DELETE_ON_NO_LINKS : LifetimePolicy.PERMANENT);
Object alternateExchange = attributes.remove(ALTERNATE_EXCHANGE);
if (alternateExchange != null)
{
String alternateExchangeName = String.valueOf(alternateExchange);
validateAlternateExchangeIsNotQueue(virtualHost, alternateExchangeName);
attributes.put(Exchange.ALTERNATE_BINDING,
Collections.singletonMap(AlternateBinding.DESTINATION, alternateExchangeName));
}
validateAndSanitizeExchangeDeclareArguments(attributes);
exchange = virtualHost.createMessageDestination(Exchange.class, attributes);
if (!nowait)
{
sync();
_connection.writeFrame(declareOkBody.generateFrame(getChannelId()));
}
}
catch (ReservedExchangeNameException e)
{
Exchange existing = getExchange(name);
if (existing == null || !existing.getType().equals(typeString))
{
_connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED,
"Attempt to declare exchange: '" + exchangeName +
"' which begins with reserved prefix.", getChannelId());
}
else if(!nowait)
{
sync();
_connection.writeFrame(declareOkBody.generateFrame(getChannelId()));
}
}
catch (AbstractConfiguredObject.DuplicateNameException e)
{
exchange = (Exchange<?>) e.getExisting();
if (!exchange.getType().equals(typeString))
{
_connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED, "Attempt to redeclare exchange: '"
+ exchangeName + "' of type "
+ exchange.getType()
+ " to " + type + ".", getChannelId());
}
else
{
if (!nowait)
{
sync();
_connection.writeFrame(declareOkBody.generateFrame(getChannelId()));
}
}
}
catch (NoFactoryForTypeException e)
{
_connection.sendConnectionClose(ErrorCodes.COMMAND_INVALID, "Unknown exchange type '"
+ e.getType()
+ "' for exchange '"
+ exchangeName
+ "'", getChannelId());
}
catch (AccessControlException e)
{
_connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, e.getMessage(), getChannelId());
}
catch (UnknownAlternateBindingException e)
{
final String message = String.format("Unknown alternate destination '%s'", e.getAlternateBindingName());
_connection.sendConnectionClose(ErrorCodes.NOT_FOUND, message, getChannelId());
}
catch (IllegalArgumentException | IllegalConfigurationException e)
{
_connection.sendConnectionClose(ErrorCodes.INVALID_ARGUMENT, "Error creating exchange '"
+ exchangeName
+ "': "
+ e.getMessage(), getChannelId());
}
}
}
}