in iothub/device/src/Transport/Amqp/AmqpConnectionHolder.cs [177:242]
public async Task<AmqpIotConnection> EnsureConnectionAsync(CancellationToken cancellationToken)
{
if (Logging.IsEnabled)
Logging.Enter(this, nameof(EnsureConnectionAsync));
AmqpIotConnection amqpIotConnection = null;
IAmqpAuthenticationRefresher amqpAuthenticationRefresher = null;
try
{
await _lock.WaitAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
throw new TimeoutException();
}
try
{
if (_amqpIotConnection == null || _amqpIotConnection.IsClosing())
{
if (Logging.IsEnabled)
Logging.Info(this, "Creating new AmqpConnection", nameof(EnsureConnectionAsync));
// Create AmqpConnection
amqpIotConnection = await _amqpIotConnector.OpenConnectionAsync(cancellationToken).ConfigureAwait(false);
if (_deviceIdentity.AuthenticationModel == AuthenticationModel.SasGrouped)
{
if (Logging.IsEnabled)
Logging.Info(this, "Creating connection wide AmqpAuthenticationRefresher", nameof(EnsureConnectionAsync));
amqpAuthenticationRefresher = new AmqpAuthenticationRefresher(_deviceIdentity, amqpIotConnection.GetCbsLink());
await amqpAuthenticationRefresher.InitLoopAsync(cancellationToken).ConfigureAwait(false);
}
_amqpIotConnection = amqpIotConnection;
_amqpAuthenticationRefresher = amqpAuthenticationRefresher;
_amqpIotConnection.Closed += OnConnectionClosed;
if (Logging.IsEnabled)
Logging.Associate(this, _amqpIotConnection, nameof(_amqpIotConnection));
}
else
{
amqpIotConnection = _amqpIotConnection;
}
}
catch (Exception ex) when (!ex.IsFatal())
{
if (amqpAuthenticationRefresher != null)
{
await amqpAuthenticationRefresher.StopLoopAsync().ConfigureAwait(false);
}
amqpIotConnection?.SafeClose();
throw;
}
finally
{
_lock.Release();
}
if (Logging.IsEnabled)
Logging.Exit(this, nameof(EnsureConnectionAsync));
return amqpIotConnection;
}