in broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java [373:584]
protected void onOpen()
{
super.onOpen();
final Map<String,Object> attributes = getActualAttributes();
final LinkedHashMap<String, Object> arguments = new LinkedHashMap<>(attributes);
arguments.put(Queue.EXCLUSIVE, _exclusive);
arguments.put(Queue.LIFETIME_POLICY, getLifetimePolicy());
_arguments = Collections.synchronizedMap(arguments);
_queueHouseKeepingTask = new AdvanceConsumersTask();
final Set<SessionPrincipal> sessionPrincipals = getSessionPrincipals();
final AMQPSession<?, ?> session;
if (sessionPrincipals.isEmpty())
{
session = null;
}
else
{
final SessionPrincipal sessionPrincipal = sessionPrincipals.iterator().next();
session = sessionPrincipal.getSession();
}
if (session != null)
{
switch(_exclusive)
{
case PRINCIPAL:
_exclusiveOwner = session.getAMQPConnection().getAuthorizedPrincipal();
break;
case CONTAINER:
_exclusiveOwner = session.getAMQPConnection().getRemoteContainerName();
break;
case CONNECTION:
_exclusiveOwner = session.getAMQPConnection();
addExclusivityConstraint(session.getAMQPConnection());
break;
case SESSION:
_exclusiveOwner = session;
addExclusivityConstraint(session);
break;
case NONE:
case LINK:
case SHARED_SUBSCRIPTION:
break;
default:
throw new ServerScopedRuntimeException("Unknown exclusivity policy: "
+ _exclusive
+ " this is a coding error inside Qpid");
}
}
else if (_exclusive == ExclusivityPolicy.PRINCIPAL)
{
if (attributes.get(Queue.OWNER) != null)
{
final String owner = String.valueOf(attributes.get(Queue.OWNER));
Principal ownerPrincipal;
try
{
ownerPrincipal = new GenericPrincipal(owner);
}
catch (IllegalArgumentException e)
{
ownerPrincipal = new GenericPrincipal(owner + "@('')");
}
_exclusiveOwner = new AuthenticatedPrincipal(ownerPrincipal);
}
}
else if (_exclusive == ExclusivityPolicy.CONTAINER)
{
if (attributes.get(Queue.OWNER) != null)
{
_exclusiveOwner = String.valueOf(attributes.get(Queue.OWNER));
}
}
if (getLifetimePolicy() == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE)
{
if (session != null)
{
addLifetimeConstraint(session.getAMQPConnection());
}
else
{
throw new IllegalArgumentException("Queues created with a lifetime policy of "
+ getLifetimePolicy()
+ " must be created from a connection.");
}
}
else if (getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END)
{
if (session != null)
{
addLifetimeConstraint(session);
}
else
{
throw new IllegalArgumentException("Queues created with a lifetime policy of "
+ getLifetimePolicy()
+ " must be created from a connection.");
}
}
else if (getLifetimePolicy() == LifetimePolicy.DELETE_ON_CREATING_LINK_CLOSE)
{
if (_creatingLinkInfo != null)
{
final LinkModel link;
if (_creatingLinkInfo.isSendingLink())
{
link = _virtualHost.getSendingLink(_creatingLinkInfo.getRemoteContainerId(), _creatingLinkInfo.getLinkName());
}
else
{
link = _virtualHost.getReceivingLink(_creatingLinkInfo.getRemoteContainerId(), _creatingLinkInfo.getLinkName());
}
addLifetimeConstraint(link);
}
else
{
throw new IllegalArgumentException("Queues created with a lifetime policy of "
+ getLifetimePolicy()
+ " must be created from a AMQP 1.0 link.");
}
}
switch (getMessageGroupType())
{
case NONE:
_messageGroupManager = null;
break;
case STANDARD:
_messageGroupManager = new AssignedConsumerMessageGroupManager(getMessageGroupKeyOverride(), getMaximumDistinctGroups());
break;
case SHARED_GROUPS:
_messageGroupManager =
new DefinedGroupMessageGroupManager(getMessageGroupKeyOverride(), getMessageGroupDefaultGroup(), this);
break;
default:
throw new IllegalArgumentException("Unknown messageGroupType type " + _messageGroupType);
}
_mimeTypeToFileExtension = getContextValue(Map.class, MAP_OF_STRING_STRING, MIME_TYPE_TO_FILE_EXTENSION);
_messageConversionExceptionHandlingPolicy = getContextValue(MessageConversionExceptionHandlingPolicy.class, MESSAGE_CONVERSION_EXCEPTION_HANDLING_POLICY);
_flowToDiskThreshold = getAncestor(Broker.class).getFlowToDiskThreshold();
if (_defaultFilters != null)
{
final QpidServiceLoader qpidServiceLoader = new QpidServiceLoader();
final Map<String, MessageFilterFactory> messageFilterFactories =
qpidServiceLoader.getInstancesByType(MessageFilterFactory.class);
for (Map.Entry<String,Map<String,List<String>>> entry : _defaultFilters.entrySet())
{
final String name = String.valueOf(entry.getKey());
final Map<String, List<String>> filterValue = entry.getValue();
if (filterValue.size() == 1)
{
final String filterTypeName = String.valueOf(filterValue.keySet().iterator().next());
final MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName);
if (filterFactory != null)
{
final List<String> filterArguments = filterValue.values().iterator().next();
// check the arguments are valid
filterFactory.newInstance(filterArguments);
_defaultFiltersMap.put(name, () -> filterFactory.newInstance(filterArguments));
}
else
{
throw new IllegalArgumentException("Unknown filter type " + filterTypeName + ", known types are: " + messageFilterFactories.keySet());
}
}
else
{
throw new IllegalArgumentException("Filter value should be a map with one entry, having the type as key and the value being the filter arguments, not " + filterValue);
}
}
}
if (isHoldOnPublishEnabled())
{
_holdMethods.add((messageReference, evaluationTime) ->
messageReference.getMessage().getMessageHeader().getNotValidBefore() >= evaluationTime);
}
if (getAlternateBinding() != null)
{
final String alternateDestination = getAlternateBinding().getDestination();
_alternateBindingDestination = getOpenedMessageDestination(alternateDestination);
if (_alternateBindingDestination != null)
{
_alternateBindingDestination.addReference(this);
}
else
{
LOGGER.warn("Cannot find alternate binding destination '{}' for queue '{}'", alternateDestination, toString());
}
}
createOverflowPolicyHandlers(_overflowPolicy);
updateAlertChecks();
}