in iothub/device/src/Transport/Amqp/AmqpUnit.cs [110:187]
internal async Task<AmqpIotSession> EnsureSessionIsOpenAsync(CancellationToken cancellationToken)
{
if (_closed)
{
throw new IotHubException("Device is now offline.", false);
}
if (Logging.IsEnabled)
Logging.Enter(this, nameof(EnsureSessionIsOpenAsync));
cancellationToken.ThrowIfCancellationRequested();
try
{
await _sessionSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
throw new TimeoutException("Failed to enter the semaphore required for opening an AMQP session.");
}
try
{
if (_amqpIotSession == null || _amqpIotSession.IsClosing())
{
// SafeClose is a fire-and-forget operation. As a result, when it returns the AMQP session might be in closing state
// and may still be referenced by its parent object. Adding locks or checks for this isn't possible because the AMQP
// library doesn't provide any callbacks for notifying us of the state.
// Instead, we have error handling logic when we open sessions.
// If the operation throws an exception, the error handling code will determine if it is to be tried, and it will retry, if necessary.
_amqpIotSession?.SafeClose();
_amqpIotSession = await _amqpConnectionHolder.OpenSessionAsync(_deviceIdentity, cancellationToken).ConfigureAwait(false);
if (Logging.IsEnabled)
Logging.Associate(this, _amqpIotSession, nameof(_amqpIotSession));
if (_deviceIdentity.AuthenticationModel == AuthenticationModel.SasIndividual)
{
_amqpAuthenticationRefresher = await _amqpConnectionHolder.CreateRefresherAsync(_deviceIdentity, cancellationToken).ConfigureAwait(false);
if (Logging.IsEnabled)
Logging.Associate(this, _amqpAuthenticationRefresher, nameof(_amqpAuthenticationRefresher));
}
_amqpIotSession.Closed += OnSessionDisconnected;
_messageSendingLink = await _amqpIotSession.OpenTelemetrySenderLinkAsync(_deviceIdentity, cancellationToken).ConfigureAwait(false);
_messageSendingLink.Closed += (obj, arg) =>
{
_amqpIotSession.SafeClose();
};
if (Logging.IsEnabled)
Logging.Associate(this, _messageSendingLink, nameof(_messageSendingLink));
}
if (_disposed)
{
throw new IotHubException("Device is now offline.", false);
}
}
catch (Exception)
{
await CleanupAsync().ConfigureAwait(false);
throw;
}
finally
{
_sessionSemaphore.Release();
}
if (Logging.IsEnabled)
Logging.Exit(this, nameof(EnsureSessionIsOpenAsync));
return _amqpIotSession;
}