private async Task ManageSessionStateMachineAsync()

in src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.cs [869:1160]


        private async Task ManageSessionStateMachineAsync(CancellationToken ct)
        {
            var currentSessionState = SessionState.Disconnected;

            var reconnectPeriod = 0;
            var reconnectTimer = _timeProvider.CreateTimer(
                _ => TriggerConnectionEvent(ConnectionEvent.ConnectRetry), null,
                Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
            try
            {
                await using (reconnectTimer.ConfigureAwait(false))
                {
                    try
                    {
                        await foreach (var (trigger, context) in
                            _channel.Reader.ReadAllAsync(ct).ConfigureAwait(false))
                        {
                            _logger.LogDebug("{Client}: Processing event {Event} in State {State}...",
                                this, trigger, currentSessionState);

                            switch (trigger)
                            {
                                case ConnectionEvent.Reset:
                                    if (currentSessionState != SessionState.Connected)
                                    {
                                        (context as TaskCompletionSource)?.TrySetResult();
                                        break;
                                    }
                                    // If currently reconnecting, dispose the reconnect handler
                                    _reconnectHandler.CancelReconnect();
                                    //
                                    // Close bypassing everything but keep channel open then trigger a
                                    // reconnect. The reconnect will recreate the session and subscriptions
                                    //
                                    Debug.Assert(_session != null);
                                    await _session.CloseAsync(false, default).ConfigureAwait(false);
                                    goto case ConnectionEvent.StartReconnect;
                                case ConnectionEvent.Connect:
                                    if (currentSessionState == SessionState.Disconnected)
                                    {
                                        // Start connecting
                                        reconnectTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
                                        currentSessionState = SessionState.Connecting;
                                        reconnectPeriod = GetMinReconnectPeriod();
                                    }
                                    goto case ConnectionEvent.ConnectRetry;
                                case ConnectionEvent.ConnectRetry:
                                    switch (currentSessionState)
                                    {
                                        case SessionState.Connecting:
                                            Debug.Assert(_reconnectHandler.State == SessionReconnectHandler.ReconnectState.Ready);
                                            Debug.Assert(_disconnectLock != null);
                                            Debug.Assert(_session == null);

                                            if (!await TryConnectAsync(ct).ConfigureAwait(false) ||
                                                !await TrySyncAsync(ct).ConfigureAwait(false))
                                            {
                                                // Reschedule connecting
                                                await CloseSessionAsync(false).ConfigureAwait(false);
                                                Debug.Assert(reconnectPeriod != 0, "Reconnect period should not be 0.");
                                                var retryDelay = TimeSpan.FromMilliseconds(
                                                    _reconnectHandler.CheckedReconnectPeriod(reconnectPeriod));
                                                _logger.LogInformation("{Client}: Retrying connecting session in {RetryDelay}...",
                                                    this, retryDelay);
                                                reconnectTimer.Change(retryDelay, Timeout.InfiniteTimeSpan);
                                                reconnectPeriod = _reconnectHandler.JitteredReconnectPeriod(reconnectPeriod);
                                                break;
                                            }

                                            Debug.Assert(_session != null);

                                            // Allow access to session now
                                            Debug.Assert(_disconnectLock != null);
                                            _disconnectLock.Dispose();
                                            _disconnectLock = null;

                                            currentSessionState = SessionState.Connected;
                                            NotifySubscriptions(_session, false);
                                            break;
                                        case SessionState.Disconnected:
                                        case SessionState.Connected:
                                            // Nothing to do, already disconnected or connected
                                            break;
                                        case SessionState.Reconnecting:
                                            Debug.Fail("Should not be connecting during reconnecting.");
                                            break;
                                    }
                                    break;
                                case ConnectionEvent.SubscriptionSyncOne:
                                    var subscriptionToSync = context as OpcUaSubscription;
                                    Debug.Assert(subscriptionToSync != null);
                                    await SyncAsync(subscriptionToSync, ct).ConfigureAwait(false);
                                    break;
                                case ConnectionEvent.SubscriptionSyncAll:
                                    await TrySyncAsync(ct).ConfigureAwait(false);
                                    break;
                                case ConnectionEvent.StartReconnect: // sent by the keep alive timeout path
                                    switch (currentSessionState)
                                    {
                                        case SessionState.Connected: // only valid when connected.
                                            Debug.Assert(_reconnectHandler.State == SessionReconnectHandler.ReconnectState.Ready);
                                            _logger.LogInformation("{Client}: Reconnecting session {Session} due to {Reason}...",
                                                this, _sessionName, (context is ServiceResult sr) ? "error " + sr : "RESET");

                                            // Ensure no more access to the session through reader locks
                                            Debug.Assert(_disconnectLock == null);
                                            _disconnectLock = await _lock.WriterLockAsync(ct);
                                            _logger.LogInformation("{Client}: Begin reconnecting session {Session}...",
                                                this, _sessionName);
                                            Debug.Assert(_session != null);
                                            var state = _reconnectHandler.BeginReconnect(_session,
                                                _reverseConnectManager, GetMinReconnectPeriod(), (sender, evt) =>
                                                {
                                                    if (!ReferenceEquals(sender, _reconnectHandler))
                                                    {
                                                        _logger.LogError("{Client}: Reconnect handler mismatch.", this);
                                                        return;
                                                    }
                                                    TriggerConnectionEvent(ConnectionEvent.ReconnectComplete,
                                                        _reconnectHandler.Session);
                                                });

                                            // Save session while reconnecting.
                                            Debug.Assert(_reconnectingSession == null);
                                            _reconnectingSession = _session;
                                            _session = null;
                                            NotifyConnectivityStateChange(EndpointConnectivityState.Connecting);
                                            currentSessionState = SessionState.Reconnecting;
                                            NotifySubscriptions(_reconnectingSession, true);
                                            (context as TaskCompletionSource)?.TrySetResult();
                                            break;
                                        case SessionState.Connecting:
                                        case SessionState.Disconnected:
                                        case SessionState.Reconnecting:
                                            // Nothing to do in this state
                                            break;
                                    }
                                    break;

                                case ConnectionEvent.ReconnectComplete:
                                    // if session recovered, Session property is not null
                                    var reconnected = _reconnectHandler.Session as OpcUaSession;
                                    switch (currentSessionState)
                                    {
                                        case SessionState.Reconnecting:
                                            _logger.LogInformation("{Client}: Completed reconnecting session {Session}...",
                                                this, _sessionName);
                                            //
                                            // Behavior of the reconnect handler is as follows:
                                            // 1) newSession == null
                                            //  => then the old session is still good, we missed keep alive.
                                            // 2) newSession != null but equal to previous session
                                            //  => new channel was opened but the existing session was reactivated
                                            // 3) newSession != previous Session
                                            //  => everything reconnected and new session was activated.
                                            //
                                            reconnected ??= _reconnectingSession;

                                            Debug.Assert(reconnected != null, "reconnected should never be null");
                                            Debug.Assert(reconnected.Connected, "reconnected should always be connected");

                                            // Handles all 3 cases above.
                                            var isNew = await UpdateSessionAsync(reconnected).ConfigureAwait(false);

                                            Debug.Assert(_session != null);
                                            Debug.Assert(_reconnectingSession == null);
                                            if (!isNew)
                                            {
                                                // Case 1) and 2)
                                                _logger.LogInformation("{Client}: Client RECOVERED!", this);
                                            }
                                            else
                                            {
                                                // Case 3)
                                                _logger.LogInformation("{Client}: Client RECONNECTED!", this);
                                                _numberOfConnectionRetries++;
                                            }

                                            // If not already ready, signal we are ready again and ...
                                            NotifyConnectivityStateChange(EndpointConnectivityState.Ready);
                                            // ... allow access to the client again
                                            Debug.Assert(_disconnectLock != null);
                                            _disconnectLock.Dispose();
                                            _disconnectLock = null;
                                            _reconnectRequired = 0;
                                            reconnectPeriod = GetMinReconnectPeriod();
                                            currentSessionState = SessionState.Connected;

                                            if (!await TrySyncAsync(ct).ConfigureAwait(false))
                                            {
                                                TriggerReconnect(StatusCodes.BadNotConnected,
                                                    "Failed to synchronize subscriptions after reconnect.");
                                                break;
                                            }
                                            NotifySubscriptions(_session, false);
                                            break;

                                        case SessionState.Connected:
                                            Debug.Fail("Should not signal reconnected when already connected.");
                                            break;
                                        case SessionState.Connecting:
                                        case SessionState.Disconnected:
                                            Debug.Assert(_reconnectingSession == null);
                                            reconnected?.Dispose();
                                            break;
                                    }
                                    break;
                                case ConnectionEvent.Disconnect:
                                    if (currentSessionState != SessionState.Disconnected)
                                    {
                                        await HandleDisconnectEvent(ct).ConfigureAwait(false);
                                        currentSessionState = SessionState.Disconnected;
                                    }
                                    break;
                            }

                            _logger.LogDebug("{Client}: Event {Event} in State {State} processed.", trigger,
                                this, currentSessionState);
                        }
                    }
                    catch (OperationCanceledException) { }
                    catch (Exception ex)
                    {
                        _logger.LogError(ex, "{Client}: Connection manager exited unexpectedly...", this);
                    }
                    finally
                    {
                        _reconnectHandler.CancelReconnect();
                    }
                }
            }
            catch (OperationCanceledException) { }
            catch (Exception ex)
            {
                _logger.LogError(ex, "{Client}: Exception in management loop.", this);
                throw;
            }
            finally
            {
                if (currentSessionState != SessionState.Disconnected)
                {
                    _logger.LogInformation(
                        "{Client}: Disconnect because client is disposed.", this);
                    await HandleDisconnectEvent(default).ConfigureAwait(false);
                    currentSessionState = SessionState.Disconnected;
                }
                _logger.LogInformation("{Client}: Exiting client management loop.", this);
            }

            async ValueTask HandleDisconnectEvent(CancellationToken cancellationToken)
            {
                // If currently reconnecting, dispose the reconnect handler and stop timer
                _reconnectHandler.CancelReconnect();
                reconnectTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);

                // if not already disconnected, aquire writer lock
                _disconnectLock ??= await _lock.WriterLockAsync(cancellationToken);
                reconnectPeriod = 0;

                NotifyConnectivityStateChange(EndpointConnectivityState.Disconnected);
                NotifySubscriptions(_session, true);

                await CloseSessionAsync().ConfigureAwait(false);
                Debug.Assert(_session == null);
            }

            static void NotifySubscriptions(OpcUaSession? session, bool disconnected)
            {
                if (session == null)
                {
                    return;
                }
                foreach (var h in session.SubscriptionHandles.Values)
                {
                    h.NotifySessionConnectionState(disconnected);
                }
            }

            int GetMinReconnectPeriod()
            {
                var reconnectPeriod = MinReconnectDelay ?? TimeSpan.Zero;
                if (reconnectPeriod == TimeSpan.Zero)
                {
                    reconnectPeriod = TimeSpan.FromSeconds(1);
                }
                if (reconnectPeriod > _maxReconnectPeriod)
                {
                    reconnectPeriod = _maxReconnectPeriod;
                }
                return (int)reconnectPeriod.TotalMilliseconds;
            }
        }