in broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java [190:399]
public void messageSubscribe(ServerSession session, MessageSubscribe method)
{
/*
TODO - work around broken Python tests
Correct code should read like
if not hasAcceptMode() exception ILLEGAL_ARGUMENT "Accept-mode not supplied"
else if not method.hasAcquireMode() exception ExecutionErrorCode.ILLEGAL_ARGUMENT, "Acquire-mode not supplied"
*/
if(!method.hasAcceptMode())
{
method.setAcceptMode(MessageAcceptMode.EXPLICIT);
}
if(!method.hasAcquireMode())
{
method.setAcquireMode(MessageAcquireMode.PRE_ACQUIRED);
}
if(!method.hasQueue())
{
exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "queue not supplied");
}
else
{
String destination = method.getDestination();
if (destination == null)
{
exception(session, method, ExecutionErrorCode.INVALID_ARGUMENT, "Subscriber must provide a destination. The protocol specification marking the destination argument as optional is considered a mistake.");
}
else if(session.getSubscription(destination) != null)
{
exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Subscription already exists with destination '"+destination+"'");
}
else
{
String queueName = method.getQueue();
NamedAddressSpace addressSpace = getAddressSpace(session);
final Collection<MessageSource> sources = new HashSet<>();
final MessageSource queue = addressSpace.getAttainedMessageSource(queueName);
if(method.getArguments() != null && method.getArguments().get("x-multiqueue") instanceof Collection)
{
for(Object object : (Collection<Object>)method.getArguments().get("x-multiqueue"))
{
String sourceName = String.valueOf(object);
sourceName = sourceName.trim();
if(sourceName.length() != 0)
{
MessageSource source = addressSpace.getAttainedMessageSource(sourceName);
if(source == null)
{
sources.clear();
break;
}
else
{
sources.add(source);
}
}
}
queueName = method.getArguments().get("x-multiqueue").toString();
}
else if(queue != null)
{
sources.add(queue);
}
if(sources.isEmpty())
{
exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue: " + queueName + " not found");
}
else if(!verifySessionAccess(session, sources))
{
exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
}
else
{
ProtocolEngine protocolEngine = getServerConnection(session).getAmqpConnection();
FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
FilterManager filterManager = null;
try
{
filterManager = FilterManagerFactory.createManager(method.getArguments());
}
catch (AMQInvalidArgumentException amqe)
{
exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Exception Creating FilterManager");
return;
}
if(method.hasArguments() && method.getArguments().containsKey(AMQPFilterTypes.REPLAY_PERIOD.toString()))
{
Object value = method.getArguments().get(AMQPFilterTypes.REPLAY_PERIOD.toString());
final long period;
if(value instanceof Number)
{
period = ((Number)value).longValue();
}
else if(value instanceof String)
{
try
{
period = Long.parseLong(value.toString());
}
catch (NumberFormatException e)
{
exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
return;
}
}
else
{
exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
return;
}
final long startingFrom = System.currentTimeMillis() - (1000L * period);
if(filterManager == null)
{
filterManager = new FilterManager();
}
MessageFilter filter = new ArrivalTimeFilter(startingFrom, period == 0);
filterManager.add(filter.getName(), filter);
}
boolean multiQueue = sources.size()>1;
ConsumerTarget_0_10 target = new ConsumerTarget_0_10(session, destination,
method.getAcceptMode(),
method.getAcquireMode(),
MessageFlowMode.WINDOW,
creditManager,
method.getArguments(),
multiQueue
);
Integer priority = null;
if(method.hasArguments() && method.getArguments().containsKey("x-priority"))
{
Object value = method.getArguments().get("x-priority");
if(value instanceof Number)
{
priority = ((Number)value).intValue();
}
else if(value instanceof String)
{
try
{
priority = Integer.parseInt(value.toString());
}
catch (NumberFormatException e)
{
}
}
}
session.register(destination, target);
try
{
EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED)
{
options.add(ConsumerOption.ACQUIRES);
}
if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT)
{
options.add(ConsumerOption.SEES_REQUEUES);
}
if(method.getExclusive())
{
options.add(ConsumerOption.EXCLUSIVE);
}
for(MessageSource source : sources)
{
source.addConsumer(target,
filterManager,
MessageTransferMessage.class,
destination,
options,
priority);
}
target.updateNotifyWorkDesired();
}
catch (Queue.ExistingExclusiveConsumer existing)
{
exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an exclusive consumer");
}
catch (Queue.ExistingConsumerPreventsExclusive exclusive)
{
exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively");
}
catch (AccessControlException e)
{
exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
}
catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
{
exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an incompatible exclusivity policy");
}
catch (MessageSource.QueueDeleted queueDeleted)
{
exception(session, method, ExecutionErrorCode.NOT_FOUND, "Queue was deleted");
}
}
}
}
}