in src/AmqpConnection.cs [553:608]
void OnReceiveSessionFrame(Frame frame)
{
AmqpSession session = null;
Performative command = frame.Command;
ushort channel = frame.Channel;
if (command.DescriptorCode == Begin.Code)
{
Begin begin = (Begin)command;
if (begin.RemoteChannel.HasValue)
{
// reply to begin
lock (this.ThisLock)
{
if (!this.sessionsByLocalHandle.TryGetObject(begin.RemoteChannel.Value, out session))
{
throw new AmqpException(AmqpErrorCode.NotFound, AmqpResources.GetString(AmqpResources.AmqpChannelNotFound, begin.RemoteChannel.Value, this));
}
session.RemoteChannel = channel;
this.sessionsByRemoteHandle.Add(channel, session);
}
}
else
{
// new begin request
AmqpSessionSettings settings = AmqpSessionSettings.Create(begin);
settings.RemoteChannel = channel;
session = this.SessionFactory.CreateSession(this, settings);
this.AddSession(session, channel);
}
}
else
{
if (!this.sessionsByRemoteHandle.TryGetObject((uint)channel, out session))
{
if (command.DescriptorCode == End.Code ||
command.DescriptorCode == Detach.Code ||
this.Settings.IgnoreMissingSessions)
{
// The session close may timed out already
AmqpTrace.Provider.AmqpMissingHandle(this, "session", channel);
return;
}
throw new AmqpException(AmqpErrorCode.NotFound, AmqpResources.GetString(AmqpResources.AmqpChannelNotFound, channel, this));
}
else if (command.DescriptorCode == End.Code)
{
this.sessionsByRemoteHandle.Remove((uint)channel);
session.RemoteChannel = null;
}
}
session.ProcessFrame(frame);
}