in provisioning/transport/amqp/src/AmqpClientConnection.cs [54:126]
public async Task OpenAsync(
bool useWebSocket,
X509Certificate2 clientCert,
IWebProxy proxy,
RemoteCertificateValidationCallback remoteCerificateValidationCallback,
CancellationToken cancellationToken)
{
if (Logging.IsEnabled)
{
Logging.Enter(this, $"{nameof(AmqpClientConnection)}.{nameof(OpenAsync)}");
}
string hostName = _uri.Host;
var tcpSettings = new TcpTransportSettings { Host = hostName, Port = _uri.Port != -1 ? _uri.Port : AmqpConstants.DefaultSecurePort };
TransportSettings = new TlsTransportSettings(tcpSettings)
{
TargetHost = hostName,
Certificate = clientCert,
CertificateValidationCallback = remoteCerificateValidationCallback,
};
if (useWebSocket)
{
_transport = await CreateClientWebSocketTransportAsync(proxy, cancellationToken).ConfigureAwait(false);
SaslTransportProvider provider = _amqpSettings.GetTransportProvider<SaslTransportProvider>();
if (provider != null)
{
if (Logging.IsEnabled)
{
Logging.Info(this, $"{nameof(AmqpClientConnection)}.{nameof(OpenAsync)}: Using SaslTransport");
}
_sentHeader = new ProtocolHeader(provider.ProtocolId, provider.DefaultVersion);
using var buffer = new ByteBuffer(new byte[AmqpConstants.ProtocolHeaderSize]);
_sentHeader.Encode(buffer);
_tcs = new TaskCompletionSource<TransportBase>();
var args = new TransportAsyncCallbackArgs();
args.SetBuffer(buffer.Buffer, buffer.Offset, buffer.Length);
args.CompletedCallback = OnWriteHeaderComplete;
args.Transport = _transport;
bool operationPending = _transport.WriteAsync(args);
if (Logging.IsEnabled)
{
Logging.Info(
this,
$"{nameof(AmqpClientConnection)}.{nameof(OpenAsync)}: " +
$"Sent Protocol Header: {_sentHeader} operationPending: {operationPending} completedSynchronously: {args.CompletedSynchronously}");
}
if (!operationPending)
{
args.CompletedCallback(args);
}
_transport = await _tcs.Task.ConfigureAwait(false);
await _transport.OpenAsync(cancellationToken).ConfigureAwait(false);
}
}
else
{
var tcpInitiator = new AmqpTransportInitiator(_amqpSettings, TransportSettings);
_transport = await tcpInitiator.ConnectAsync(cancellationToken).ConfigureAwait(false);
}
AmqpConnection = new AmqpConnection(_transport, _amqpSettings, AmqpConnectionSettings);
AmqpConnection.Closed += OnConnectionClosed;
await AmqpConnection.OpenAsync(cancellationToken).ConfigureAwait(false);
IsConnectionClosed = false;
}