in broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java [560:713]
private AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks,
FieldTable arguments, boolean exclusive, boolean noLocal)
throws MessageSource.ExistingConsumerPreventsExclusive,
MessageSource.ExistingExclusiveConsumer,
AMQInvalidArgumentException,
MessageSource.ConsumerAccessRefused, ConsumerTagInUseException, MessageSource.QueueDeleted
{
if (tag == null)
{
do
{
tag = AMQShortString.createAMQShortString("sgen_" + getNextConsumerTag());
}
while (_nonGeneratedTags.contains(tag));
}
else if (!_nonGeneratedTags.add(tag))
{
throw new ConsumerTagInUseException("Consumer already exists with same tag: " + tag);
}
if (_tag2SubscriptionTargetMap.containsKey(tag))
{
throw new ConsumerTagInUseException("Consumer already exists with same tag: " + tag);
}
ConsumerTarget_0_8 target;
EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
final boolean multiQueue = sources.size()>1;
if(arguments != null && Boolean.TRUE.equals(arguments.get(AMQPFilterTypes.NO_CONSUME.getValue())))
{
target = ConsumerTarget_0_8.createBrowserTarget(this, tag, arguments,
INFINITE_CREDIT_CREDIT_MANAGER, multiQueue);
}
else if(acks)
{
target = ConsumerTarget_0_8.createAckTarget(this, tag, arguments, _creditManager, multiQueue);
options.add(ConsumerOption.ACQUIRES);
options.add(ConsumerOption.SEES_REQUEUES);
}
else
{
target = ConsumerTarget_0_8.createNoAckTarget(this, tag, arguments,
INFINITE_CREDIT_CREDIT_MANAGER, multiQueue);
options.add(ConsumerOption.ACQUIRES);
options.add(ConsumerOption.SEES_REQUEUES);
}
if(exclusive)
{
options.add(ConsumerOption.EXCLUSIVE);
}
// So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
// We add before we register as the Async Delivery process may AutoClose the subscriber
// so calling _cT2QM.remove before we have done put which was after the register succeeded.
// So to keep things straight we put before the call and catch all exceptions from the register and tidy up.
_tag2SubscriptionTargetMap.put(tag, target);
try
{
FilterManager filterManager = FilterManagerFactory.createManager(FieldTable.convertToMap(arguments));
if(noLocal)
{
if(filterManager == null)
{
filterManager = new FilterManager();
}
MessageFilter filter = new NoLocalFilter();
filterManager.add(filter.getName(), filter);
}
if(arguments != null && arguments.containsKey(AMQPFilterTypes.REPLAY_PERIOD.toString()))
{
Object value = arguments.get(AMQPFilterTypes.REPLAY_PERIOD.toString());
final long period;
if(value instanceof Number)
{
period = ((Number)value).longValue();
}
else if(value instanceof String)
{
try
{
period = Long.parseLong(value.toString());
}
catch (NumberFormatException e)
{
throw new AMQInvalidArgumentException("Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
}
}
else
{
throw new AMQInvalidArgumentException("Cannot parse value " + value + " as a number for filter " + AMQPFilterTypes.REPLAY_PERIOD.toString());
}
final long startingFrom = System.currentTimeMillis() - (1000L * period);
if(filterManager == null)
{
filterManager = new FilterManager();
}
MessageFilter filter = new ArrivalTimeFilter(startingFrom, period==0);
filterManager.add(filter.getName(), filter);
}
Integer priority = null;
if(arguments != null && arguments.containsKey("x-priority"))
{
Object value = arguments.get("x-priority");
if(value instanceof Number)
{
priority = ((Number)value).intValue();
}
else if(value instanceof String || value instanceof AMQShortString)
{
try
{
priority = Integer.parseInt(value.toString());
}
catch (NumberFormatException e)
{
// use default vlaue
}
}
}
for(MessageSource source : sources)
{
source.addConsumer(target,
filterManager,
AMQMessage.class,
AMQShortString.toString(tag),
options, priority);
}
target.updateNotifyWorkDesired();
}
catch (AccessControlException
| MessageSource.ExistingExclusiveConsumer
| MessageSource.ExistingConsumerPreventsExclusive
| MessageSource.QueueDeleted
| AMQInvalidArgumentException
| MessageSource.ConsumerAccessRefused e)
{
_tag2SubscriptionTargetMap.remove(tag);
_nonGeneratedTags.remove(tag);
throw e;
}
return tag;
}