protected void onOpen()

in broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java [373:584]


    protected void onOpen()
    {
        super.onOpen();

        final Map<String,Object> attributes = getActualAttributes();

        final LinkedHashMap<String, Object> arguments = new LinkedHashMap<>(attributes);

        arguments.put(Queue.EXCLUSIVE, _exclusive);
        arguments.put(Queue.LIFETIME_POLICY, getLifetimePolicy());

        _arguments = Collections.synchronizedMap(arguments);

        _queueHouseKeepingTask = new AdvanceConsumersTask();
        final Set<SessionPrincipal> sessionPrincipals = getSessionPrincipals();
        final AMQPSession<?, ?> session;
        if (sessionPrincipals.isEmpty())
        {
            session = null;
        }
        else
        {
            final SessionPrincipal sessionPrincipal = sessionPrincipals.iterator().next();
            session = sessionPrincipal.getSession();
        }

        if (session != null)
        {

            switch(_exclusive)
            {

                case PRINCIPAL:
                    _exclusiveOwner = session.getAMQPConnection().getAuthorizedPrincipal();
                    break;
                case CONTAINER:
                    _exclusiveOwner = session.getAMQPConnection().getRemoteContainerName();
                    break;
                case CONNECTION:
                    _exclusiveOwner = session.getAMQPConnection();
                    addExclusivityConstraint(session.getAMQPConnection());
                    break;
                case SESSION:
                    _exclusiveOwner = session;
                    addExclusivityConstraint(session);
                    break;
                case NONE:
                case LINK:
                case SHARED_SUBSCRIPTION:
                    break;
                default:
                    throw new ServerScopedRuntimeException("Unknown exclusivity policy: "
                                                           + _exclusive
                                                           + " this is a coding error inside Qpid");
            }
        }
        else if (_exclusive == ExclusivityPolicy.PRINCIPAL)
        {
            if (attributes.get(Queue.OWNER) != null)
            {
                final String owner = String.valueOf(attributes.get(Queue.OWNER));
                Principal ownerPrincipal;
                try
                {
                    ownerPrincipal = new GenericPrincipal(owner);
                }
                catch (IllegalArgumentException e)
                {
                    ownerPrincipal = new GenericPrincipal(owner + "@('')");
                }
                _exclusiveOwner = new AuthenticatedPrincipal(ownerPrincipal);
            }
        }
        else if (_exclusive == ExclusivityPolicy.CONTAINER)
        {
            if (attributes.get(Queue.OWNER) != null)
            {
                _exclusiveOwner = String.valueOf(attributes.get(Queue.OWNER));
            }
        }


        if (getLifetimePolicy() == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE)
        {
            if (session != null)
            {
                addLifetimeConstraint(session.getAMQPConnection());
            }
            else
            {
                throw new IllegalArgumentException("Queues created with a lifetime policy of "
                                                   + getLifetimePolicy()
                                                   + " must be created from a connection.");
            }
        }
        else if (getLifetimePolicy() == LifetimePolicy.DELETE_ON_SESSION_END)
        {
            if (session != null)
            {
                addLifetimeConstraint(session);
            }
            else
            {
                throw new IllegalArgumentException("Queues created with a lifetime policy of "
                                                   + getLifetimePolicy()
                                                   + " must be created from a connection.");
            }
        }
        else if (getLifetimePolicy() == LifetimePolicy.DELETE_ON_CREATING_LINK_CLOSE)
        {
            if (_creatingLinkInfo != null)
            {
                final LinkModel link;
                if (_creatingLinkInfo.isSendingLink())
                {
                    link = _virtualHost.getSendingLink(_creatingLinkInfo.getRemoteContainerId(), _creatingLinkInfo.getLinkName());
                }
                else
                {
                    link = _virtualHost.getReceivingLink(_creatingLinkInfo.getRemoteContainerId(), _creatingLinkInfo.getLinkName());
                }
                addLifetimeConstraint(link);
            }
            else
            {
                throw new IllegalArgumentException("Queues created with a lifetime policy of "
                                                   + getLifetimePolicy()
                                                   + " must be created from a AMQP 1.0 link.");
            }
        }


        switch (getMessageGroupType())
        {
            case NONE:
                _messageGroupManager = null;
                break;
            case STANDARD:
                _messageGroupManager = new AssignedConsumerMessageGroupManager(getMessageGroupKeyOverride(), getMaximumDistinctGroups());
                break;
            case SHARED_GROUPS:
                _messageGroupManager =
                        new DefinedGroupMessageGroupManager(getMessageGroupKeyOverride(), getMessageGroupDefaultGroup(), this);
                break;
            default:
                throw new IllegalArgumentException("Unknown messageGroupType type " + _messageGroupType);
        }

        _mimeTypeToFileExtension = getContextValue(Map.class, MAP_OF_STRING_STRING, MIME_TYPE_TO_FILE_EXTENSION);
        _messageConversionExceptionHandlingPolicy = getContextValue(MessageConversionExceptionHandlingPolicy.class, MESSAGE_CONVERSION_EXCEPTION_HANDLING_POLICY);

        _flowToDiskThreshold = getAncestor(Broker.class).getFlowToDiskThreshold();

        if (_defaultFilters != null)
        {
            final QpidServiceLoader qpidServiceLoader = new QpidServiceLoader();
            final Map<String, MessageFilterFactory> messageFilterFactories =
                    qpidServiceLoader.getInstancesByType(MessageFilterFactory.class);

            for (Map.Entry<String,Map<String,List<String>>> entry : _defaultFilters.entrySet())
            {
                final String name = String.valueOf(entry.getKey());
                final Map<String, List<String>> filterValue = entry.getValue();
                if (filterValue.size() == 1)
                {
                    final String filterTypeName = String.valueOf(filterValue.keySet().iterator().next());
                    final MessageFilterFactory filterFactory = messageFilterFactories.get(filterTypeName);
                    if (filterFactory != null)
                    {
                        final List<String> filterArguments = filterValue.values().iterator().next();
                        // check the arguments are valid
                        filterFactory.newInstance(filterArguments);
                        _defaultFiltersMap.put(name, () -> filterFactory.newInstance(filterArguments));
                    }
                    else
                    {
                        throw new IllegalArgumentException("Unknown filter type " + filterTypeName + ", known types are: " + messageFilterFactories.keySet());
                    }
                }
                else
                {
                    throw new IllegalArgumentException("Filter value should be a map with one entry, having the type as key and the value being the filter arguments, not " + filterValue);

                }

            }
        }

        if (isHoldOnPublishEnabled())
        {
            _holdMethods.add((messageReference, evaluationTime) ->
                    messageReference.getMessage().getMessageHeader().getNotValidBefore() >= evaluationTime);
        }

        if (getAlternateBinding() != null)
        {
            final String alternateDestination = getAlternateBinding().getDestination();
            _alternateBindingDestination = getOpenedMessageDestination(alternateDestination);
            if (_alternateBindingDestination != null)
            {
                _alternateBindingDestination.addReference(this);
            }
            else
            {
                LOGGER.warn("Cannot find alternate binding destination '{}' for queue '{}'", alternateDestination, toString());
            }
        }

        createOverflowPolicyHandlers(_overflowPolicy);

        updateAlertChecks();
    }