in src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs [66:124]
public Task Attach()
{
Target target = new Target();
Source source = CreateSource();
Attach attach = new Attach
{
Target = target,
Source = source,
RcvSettleMode = ReceiverSettleMode.First,
SndSettleMode = (info.IsBrowser) ? SenderSettleMode.Settled : SenderSettleMode.Unsettled,
};
string receiverLinkName = null;
string subscriptionName = info.SubscriptionName;
if (!string.IsNullOrEmpty(subscriptionName))
{
AmqpConnection connection = session.Connection;
if (info.IsShared && !connection.Info.SharedSubsSupported) {
validateSharedSubsLinkCapability = true;
}
AmqpSubscriptionTracker subTracker = connection.SubscriptionTracker;
// Validate subscriber type allowed given existing active subscriber types.
if (info.IsShared && info.IsDurable) {
if(subTracker.IsActiveExclusiveDurableSub(subscriptionName)) {
// Don't allow shared sub if there is already an active exclusive durable sub
throw new NMSException("A non-shared durable subscription is already active with name '" + subscriptionName + "'");
}
} else if (!info.IsShared && info.IsDurable) {
if (subTracker.IsActiveExclusiveDurableSub(subscriptionName)) {
// Exclusive durable sub is already active
throw new NMSException("A non-shared durable subscription is already active with name '" + subscriptionName + "'");
} else if (subTracker.IsActiveSharedDurableSub(subscriptionName)) {
// Don't allow exclusive durable sub if there is already an active shared durable sub
throw new NMSException("A shared durable subscription is already active with name '" + subscriptionName + "'");
}
}
// Get the link name for the subscription. Throws if certain further validations fail.
receiverLinkName = subTracker.ReserveNextSubscriptionLinkName(subscriptionName, info);
}
if (receiverLinkName == null) {
string destinationAddress = source.Address ?? "";
receiverLinkName = "nms:receiver:" + info.Id
+ (destinationAddress.Length == 0 ? "" : (":" + destinationAddress));
}
// TODO: Add timeout
var tsc = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
receiverLink = new ReceiverLink(session.UnderlyingSession, receiverLinkName, attach, HandleOpened(tsc));
receiverLink.AddClosedCallback(HandleClosed(tsc));
return tsc.Task;
}