internal async Task EnsureSessionIsOpenAsync()

in iothub/device/src/Transport/Amqp/AmqpUnit.cs [110:187]


        internal async Task<AmqpIotSession> EnsureSessionIsOpenAsync(CancellationToken cancellationToken)
        {
            if (_closed)
            {
                throw new IotHubException("Device is now offline.", false);
            }

            if (Logging.IsEnabled)
                Logging.Enter(this, nameof(EnsureSessionIsOpenAsync));

            cancellationToken.ThrowIfCancellationRequested();

            try
            {
                await _sessionSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
            }
            catch (OperationCanceledException)
            {
                throw new TimeoutException("Failed to enter the semaphore required for opening an AMQP session.");
            }

            try
            {
                if (_amqpIotSession == null || _amqpIotSession.IsClosing())
                {
                    // SafeClose is a fire-and-forget operation. As a result, when it returns the AMQP session might be in closing state
                    // and may still be referenced by its parent object. Adding locks or checks for this isn't possible because the AMQP
                    // library doesn't provide any callbacks for notifying us of the state.
                    // Instead, we have error handling logic when we open sessions.
                    // If the operation throws an exception, the error handling code will determine if it is to be tried, and it will retry, if necessary.
                    _amqpIotSession?.SafeClose();

                    _amqpIotSession = await _amqpConnectionHolder.OpenSessionAsync(_deviceIdentity, cancellationToken).ConfigureAwait(false);

                    if (Logging.IsEnabled)
                        Logging.Associate(this, _amqpIotSession, nameof(_amqpIotSession));

                    if (_deviceIdentity.AuthenticationModel == AuthenticationModel.SasIndividual)
                    {
                        _amqpAuthenticationRefresher = await _amqpConnectionHolder.CreateRefresherAsync(_deviceIdentity, cancellationToken).ConfigureAwait(false);

                        if (Logging.IsEnabled)
                            Logging.Associate(this, _amqpAuthenticationRefresher, nameof(_amqpAuthenticationRefresher));
                    }

                    _amqpIotSession.Closed += OnSessionDisconnected;

                    _messageSendingLink = await _amqpIotSession.OpenTelemetrySenderLinkAsync(_deviceIdentity, cancellationToken).ConfigureAwait(false);

                    _messageSendingLink.Closed += (obj, arg) =>
                    {
                        _amqpIotSession.SafeClose();
                    };

                    if (Logging.IsEnabled)
                        Logging.Associate(this, _messageSendingLink, nameof(_messageSendingLink));
                }

                if (_disposed)
                {
                    throw new IotHubException("Device is now offline.", false);
                }
            }
            catch (Exception)
            {
                await CleanupAsync().ConfigureAwait(false);
                throw;
            }
            finally
            {
                _sessionSemaphore.Release();
            }

            if (Logging.IsEnabled)
                Logging.Exit(this, nameof(EnsureSessionIsOpenAsync));

            return _amqpIotSession;
        }