in src/AmqpSession.cs [117:179]
public void AttachLink(AmqpLink link)
{
Fx.Assert(link.Session == this, "The link is not owned by this session.");
AmqpLink linkToSteal;
lock (this.ThisLock)
{
if (this.IsClosing())
{
throw new InvalidOperationException(AmqpResources.GetString(AmqpResources.AmqpIllegalOperationState, "attach", this.State));
}
if (this.links.TryGetValue(link.LinkIdentifier, out linkToSteal) && link.AllowLinkStealing(linkToSteal.Settings))
{
// Even though link onclose handler already removes the link from the links collections,
// calling Close() is fire and forget, so we will not be waiting for the link onClose handler to trigger
// before trying to add the new link to the links collections down below in this method, therefore remove it now.
this.links.Remove(link.LinkIdentifier);
if (linkToSteal.LocalHandle.HasValue)
{
this.linksByLocalHandle.Remove(linkToSteal.LocalHandle.Value);
}
if (linkToSteal.RemoteHandle.HasValue)
{
this.linksByRemoteHandle.Remove(linkToSteal.RemoteHandle.Value);
}
}
else
{
linkToSteal = null;
}
if (this.Connection.LinkRecoveryEnabled)
{
bool foundTerminus = this.Connection.TerminusStore.TryGetLinkTerminusAsync(link.LinkIdentifier, out AmqpLinkTerminus linkTerminus).Result;
if (linkTerminus == null)
{
linkTerminus = new AmqpLinkTerminus(link.LinkIdentifier, link.Settings, this.Connection.TerminusStore);
if (!this.Connection.TerminusStore.TryAddLinkTerminusAsync(link.LinkIdentifier, linkTerminus).GetAwaiter().GetResult())
{
// There was a race and some other link has already created a link terminus and attach.
// In this case, stop opening of this link and close it due to link stealing.
throw new AmqpException(AmqpErrorCode.Stolen, AmqpResources.GetString(AmqpResources.AmqpLinkStolen, link.LinkIdentifier));
}
}
if (!linkTerminus.TryAssociateLink(link, out linkToSteal))
{
throw new InvalidOperationException("The link terminus to attach this link to is either disposed or link stealing is not allowed.");
}
}
link.Closed += onLinkClosed;
this.links.Add(link.LinkIdentifier, link);
link.LocalHandle = this.linksByLocalHandle.Add(link);
}
linkToSteal?.OnLinkStolen();
AmqpTrace.Provider.AmqpAttachLink(this.connection, this, link, link.LocalHandle.Value,
link.RemoteHandle ?? 0u, link.Name, link.IsReceiver ? "receiver" : "sender", link.Settings.Source, link.Settings.Target);
}