public void messageSubscribe()

in broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java [190:399]


    public void messageSubscribe(ServerSession session, MessageSubscribe method)
    {
        /*
          TODO - work around broken Python tests
          Correct code should read like
          if not hasAcceptMode() exception ILLEGAL_ARGUMENT "Accept-mode not supplied"
          else if not method.hasAcquireMode() exception ExecutionErrorCode.ILLEGAL_ARGUMENT, "Acquire-mode not supplied"
        */
        if(!method.hasAcceptMode())
        {
            method.setAcceptMode(MessageAcceptMode.EXPLICIT);
        }
        if(!method.hasAcquireMode())
        {
            method.setAcquireMode(MessageAcquireMode.PRE_ACQUIRED);

        }

        if(!method.hasQueue())
        {
            exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not supplied");
        }
        else
        {
            String destination = method.getDestination();

            if (destination == null)
            {
                exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Subscriber must provide a destination. The protocol specification marking the destination argument as optional is considered a mistake.");
            }
            else if(session.getSubscription(destination) != null)
            {
                exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Subscription already exists with destination '"+destination+"'");
            }
            else
            {
                String queueName = method.getQueue();
                NamedAddressSpace addressSpace = getAddressSpace(session);

                final Collection<MessageSource> sources = new HashSet<>();
                final MessageSource queue = addressSpace.getAttainedMessageSource(queueName);

                if(method.getArguments() != null && method.getArguments().get("x-multiqueue") instanceof Collection)
                {
                    for(Object object : (Collection<Object>)method.getArguments().get("x-multiqueue"))
                    {
                        String sourceName = String.valueOf(object);
                        sourceName = sourceName.trim();
                        if(sourceName.length() != 0)
                        {
                            MessageSource source = addressSpace.getAttainedMessageSource(sourceName);
                            if(source == null)
                            {
                                sources.clear();
                                break;
                            }
                            else
                            {
                                sources.add(source);
                            }
                        }
                    }
                    queueName = method.getArguments().get("x-multiqueue").toString();
                }
                else if(queue != null)
                {
                    sources.add(queue);
                }

                if(sources.isEmpty())
                {
                    exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
                }
                else if(!verifySessionAccess(session, sources))
                {
                    exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
                }
                else
                {
                    ProtocolEngine protocolEngine = getServerConnection(session).getAmqpConnection();
                    FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);

                    FilterManager filterManager = null;
                    try
                    {
                        filterManager = FilterManagerFactory.createManager(method.getArguments());
                    }
                    catch (AMQInvalidArgumentException amqe)
                    {
                        exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Exception Creating FilterManager");
                        return;
                    }


                    if(method.hasArguments() && method.getArguments().containsKey(AMQPFilterTypes.REPLAY_PERIOD.toString()))
                    {
                        Object value = method.getArguments().get(AMQPFilterTypes.REPLAY_PERIOD.toString());
                        final long period;
                        if(value instanceof Number)
                        {
                            period = ((Number)value).longValue();
                        }
                        else if(value instanceof String)
                        {
                            try
                            {
                                period = Long.parseLong(value.toString());
                            }
                            catch (NumberFormatException e)
                            {
                                exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
                                return;
                            }
                        }
                        else
                        {
                            exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
                            return;
                        }
                        final long startingFrom = System.currentTimeMillis() - (1000L * period);
                        if(filterManager == null)
                        {
                            filterManager = new FilterManager();
                        }
                        MessageFilter filter = new ArrivalTimeFilter(startingFrom, period == 0);
                        filterManager.add(filter.getName(), filter);

                    }

                    boolean multiQueue = sources.size()>1;
                    ConsumerTarget_0_10 target = new ConsumerTarget_0_10(session, destination,
                                                                         method.getAcceptMode(),
                                                                         method.getAcquireMode(),
                                                                         MessageFlowMode.WINDOW,
                                                                         creditManager,
                                                                         method.getArguments(),
                                                                         multiQueue
                    );

                    Integer priority = null;
                    if(method.hasArguments() && method.getArguments().containsKey("x-priority"))
                    {
                        Object value = method.getArguments().get("x-priority");
                        if(value instanceof Number)
                        {
                            priority = ((Number)value).intValue();
                        }
                        else if(value instanceof String)
                        {
                            try
                            {
                                priority = Integer.parseInt(value.toString());
                            }
                            catch (NumberFormatException e)
                            {
                            }
                        }
                    }

                    session.register(destination, target);
                    try
                    {
                        EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
                        if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED)
                        {
                            options.add(ConsumerOption.ACQUIRES);
                        }
                        if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT)
                        {
                            options.add(ConsumerOption.SEES_REQUEUES);
                        }
                        if(method.getExclusive())
                        {
                            options.add(ConsumerOption.EXCLUSIVE);
                        }
                        for(MessageSource source : sources)
                        {
                            source.addConsumer(target,
                                               filterManager,
                                               MessageTransferMessage.class,
                                               destination,
                                               options,
                                               priority);
                        }
                        target.updateNotifyWorkDesired();
                    }
                    catch (Queue.ExistingExclusiveConsumer existing)
                    {
                        exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer");
                    }
                    catch (Queue.ExistingConsumerPreventsExclusive exclusive)
                    {
                        exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively");
                    }
                    catch (AccessControlException e)
                    {
                        exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
                    }
                    catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
                    {
                        exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an incompatible exclusivity policy");
                    }
                    catch (MessageSource.QueueDeleted queueDeleted)
                    {
                        exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue was deleted");
                    }
                }
            }
        }
    }