private AMQShortString consumeFromSource()

in broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java [560:713]


    private AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks,
                                            FieldTable arguments, boolean exclusive, boolean noLocal)
            throws MessageSource.ExistingConsumerPreventsExclusive,
                   MessageSource.ExistingExclusiveConsumer,
                   AMQInvalidArgumentException,
                   MessageSource.ConsumerAccessRefused, ConsumerTagInUseException, MessageSource.QueueDeleted
    {
        if (tag == null)
        {
            do
            {
                tag = AMQShortString.createAMQShortString("sgen_" + getNextConsumerTag());
            }
            while (_nonGeneratedTags.contains(tag));
        }
        else if (!_nonGeneratedTags.add(tag))
        {
            throw new ConsumerTagInUseException("Consumer already exists with same tag: " + tag);
        }

        if (_tag2SubscriptionTargetMap.containsKey(tag))
        {
            throw new ConsumerTagInUseException("Consumer already exists with same tag: " + tag);
        }

        ConsumerTarget_0_8 target;
        EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
        final boolean multiQueue = sources.size()>1;
        if(arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue())))
        {
            target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments,
                                                            INFINITE_CREDIT_CREDIT_MANAGER, multiQueue);
        }
        else if(acks)
        {
            target = ConsumerTarget_0_8.createAckTarget(this, tag, arguments, _creditManager, multiQueue);
            options.add(ConsumerOption.ACQUIRES);
            options.add(ConsumerOption.SEES_REQUEUES);
        }
        else
        {
            target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments,
                                                          INFINITE_CREDIT_CREDIT_MANAGER, multiQueue);
            options.add(ConsumerOption.ACQUIRES);
            options.add(ConsumerOption.SEES_REQUEUES);
        }

        if(exclusive)
        {
            options.add(ConsumerOption.EXCLUSIVE);
        }


        // So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
        // We add before we register as the Async Delivery process may AutoClose the subscriber
        // so calling _cT2QM.remove before we have done put which was after the register succeeded.
        // So to keep things straight we put before the call and catch all exceptions from the register and tidy up.

        _tag2SubscriptionTargetMap.put(tag, target);

        try
        {
            FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(arguments));
            if(noLocal)
            {
                if(filterManager == null)
                {
                    filterManager = new FilterManager();
                }
                MessageFilter filter = new NoLocalFilter();
                filterManager.add(filter.getName(), filter);
            }

            if(arguments != null && arguments.containsKey(AMQPFilterTypes.REPLAY_PERIOD.toString()))
            {
                Object value = arguments.get(AMQPFilterTypes.REPLAY_PERIOD.toString());
                final long period;
                if(value instanceof Number)
                {
                    period = ((Number)value).longValue();
                }
                else if(value instanceof String)
                {
                    try
                    {
                        period = Long.parseLong(value.toString());
                    }
                    catch (NumberFormatException e)
                    {
                        throw new AMQInvalidArgumentException("Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
                    }
                }
                else
                {
                    throw new AMQInvalidArgumentException("Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
                }

                final long startingFrom = System.currentTimeMillis() - (1000L * period);
                if(filterManager == null)
                {
                    filterManager = new FilterManager();
                }
                MessageFilter filter = new ArrivalTimeFilter(startingFrom, period==0);
                filterManager.add(filter.getName(), filter);

            }

            Integer priority = null;
            if(arguments != null && arguments.containsKey("x-priority"))
            {
                Object value = arguments.get("x-priority");
                if(value instanceof Number)
                {
                    priority = ((Number)value).intValue();
                }
                else if(value instanceof String || value instanceof AMQShortString)
                {
                    try
                    {
                        priority = Integer.parseInt(value.toString());
                    }
                    catch (NumberFormatException e)
                    {
                        // use default vlaue
                    }
                }

            }



            for(MessageSource source : sources)
            {
                source.addConsumer(target,
                                   filterManager,
                                   AMQMessage.class,
                                   AMQShortString.toString(tag),
                                   options, priority);
            }
            target.updateNotifyWorkDesired();
        }
        catch (AccessControlException
                | MessageSource.ExistingExclusiveConsumer
                | MessageSource.ExistingConsumerPreventsExclusive
                | MessageSource.QueueDeleted
                | AMQInvalidArgumentException
                | MessageSource.ConsumerAccessRefused e)
        {
            _tag2SubscriptionTargetMap.remove(tag);
            _nonGeneratedTags.remove(tag);
            throw e;
        }
        return tag;
    }