in src/Transport/AmqpTransportListener.cs [226:283]
void OnProtocolHeader(ByteBuffer buffer)
{
ProtocolHeader header = new ProtocolHeader();
header.Decode(buffer);
AmqpTrace.OnProtocolHeader(header, false);
// Protocol id negotiation
TransportProvider provider = null;
if (!this.parent.settings.TryGetTransportProvider(header, out provider))
{
Fx.Assert(provider != null, "At least on provider should be configured.");
this.WriteReplyHeader(new ProtocolHeader(provider.ProtocolId, provider.DefaultVersion), true);
return;
}
// Protocol version negotiation
AmqpVersion version;
if (!provider.TryGetVersion(header.Version, out version))
{
this.WriteReplyHeader(new ProtocolHeader(provider.ProtocolId, version), true);
return;
}
TransportBase newTransport = null;
try
{
newTransport = provider.CreateTransport(this.args.Transport, false);
}
catch (InvalidOperationException ioe)
{
// treat this the same as protocol ID/version failure
// which are all client config issues
AmqpTrace.Provider.AmqpLogError(this, "CreateTransport", ioe);
this.WriteReplyHeader(new ProtocolHeader(ProtocolId.Amqp, AmqpVersion.V100), true);
return;
}
if (object.ReferenceEquals(newTransport, this.args.Transport))
{
if ((this.parent.settings.RequireSecureTransport && !newTransport.IsSecure) ||
(!this.parent.settings.AllowAnonymousConnection && !newTransport.IsAuthenticated))
{
AmqpTrace.Provider.AmqpInsecureTransport(this.parent, newTransport, newTransport.IsSecure, newTransport.IsAuthenticated);
this.WriteReplyHeader(this.parent.settings.GetDefaultHeader(), true);
}
else
{
this.args.UserToken = header;
this.parent.OnHandleTransportComplete(this.args);
}
}
else
{
AmqpTrace.Provider.AmqpUpgradeTransport(this, args.Transport, newTransport);
this.args.Transport = newTransport;
this.WriteReplyHeader(header, false);
}
}