in src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.cs [869:1160]
private async Task ManageSessionStateMachineAsync(CancellationToken ct)
{
var currentSessionState = SessionState.Disconnected;
var reconnectPeriod = 0;
var reconnectTimer = _timeProvider.CreateTimer(
_ => TriggerConnectionEvent(ConnectionEvent.ConnectRetry), null,
Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
try
{
await using (reconnectTimer.ConfigureAwait(false))
{
try
{
await foreach (var (trigger, context) in
_channel.Reader.ReadAllAsync(ct).ConfigureAwait(false))
{
_logger.LogDebug("{Client}: Processing event {Event} in State {State}...",
this, trigger, currentSessionState);
switch (trigger)
{
case ConnectionEvent.Reset:
if (currentSessionState != SessionState.Connected)
{
(context as TaskCompletionSource)?.TrySetResult();
break;
}
// If currently reconnecting, dispose the reconnect handler
_reconnectHandler.CancelReconnect();
//
// Close bypassing everything but keep channel open then trigger a
// reconnect. The reconnect will recreate the session and subscriptions
//
Debug.Assert(_session != null);
await _session.CloseAsync(false, default).ConfigureAwait(false);
goto case ConnectionEvent.StartReconnect;
case ConnectionEvent.Connect:
if (currentSessionState == SessionState.Disconnected)
{
// Start connecting
reconnectTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
currentSessionState = SessionState.Connecting;
reconnectPeriod = GetMinReconnectPeriod();
}
goto case ConnectionEvent.ConnectRetry;
case ConnectionEvent.ConnectRetry:
switch (currentSessionState)
{
case SessionState.Connecting:
Debug.Assert(_reconnectHandler.State == SessionReconnectHandler.ReconnectState.Ready);
Debug.Assert(_disconnectLock != null);
Debug.Assert(_session == null);
if (!await TryConnectAsync(ct).ConfigureAwait(false) ||
!await TrySyncAsync(ct).ConfigureAwait(false))
{
// Reschedule connecting
await CloseSessionAsync(false).ConfigureAwait(false);
Debug.Assert(reconnectPeriod != 0, "Reconnect period should not be 0.");
var retryDelay = TimeSpan.FromMilliseconds(
_reconnectHandler.CheckedReconnectPeriod(reconnectPeriod));
_logger.LogInformation("{Client}: Retrying connecting session in {RetryDelay}...",
this, retryDelay);
reconnectTimer.Change(retryDelay, Timeout.InfiniteTimeSpan);
reconnectPeriod = _reconnectHandler.JitteredReconnectPeriod(reconnectPeriod);
break;
}
Debug.Assert(_session != null);
// Allow access to session now
Debug.Assert(_disconnectLock != null);
_disconnectLock.Dispose();
_disconnectLock = null;
currentSessionState = SessionState.Connected;
NotifySubscriptions(_session, false);
break;
case SessionState.Disconnected:
case SessionState.Connected:
// Nothing to do, already disconnected or connected
break;
case SessionState.Reconnecting:
Debug.Fail("Should not be connecting during reconnecting.");
break;
}
break;
case ConnectionEvent.SubscriptionSyncOne:
var subscriptionToSync = context as OpcUaSubscription;
Debug.Assert(subscriptionToSync != null);
await SyncAsync(subscriptionToSync, ct).ConfigureAwait(false);
break;
case ConnectionEvent.SubscriptionSyncAll:
await TrySyncAsync(ct).ConfigureAwait(false);
break;
case ConnectionEvent.StartReconnect: // sent by the keep alive timeout path
switch (currentSessionState)
{
case SessionState.Connected: // only valid when connected.
Debug.Assert(_reconnectHandler.State == SessionReconnectHandler.ReconnectState.Ready);
_logger.LogInformation("{Client}: Reconnecting session {Session} due to {Reason}...",
this, _sessionName, (context is ServiceResult sr) ? "error " + sr : "RESET");
// Ensure no more access to the session through reader locks
Debug.Assert(_disconnectLock == null);
_disconnectLock = await _lock.WriterLockAsync(ct);
_logger.LogInformation("{Client}: Begin reconnecting session {Session}...",
this, _sessionName);
Debug.Assert(_session != null);
var state = _reconnectHandler.BeginReconnect(_session,
_reverseConnectManager, GetMinReconnectPeriod(), (sender, evt) =>
{
if (!ReferenceEquals(sender, _reconnectHandler))
{
_logger.LogError("{Client}: Reconnect handler mismatch.", this);
return;
}
TriggerConnectionEvent(ConnectionEvent.ReconnectComplete,
_reconnectHandler.Session);
});
// Save session while reconnecting.
Debug.Assert(_reconnectingSession == null);
_reconnectingSession = _session;
_session = null;
NotifyConnectivityStateChange(EndpointConnectivityState.Connecting);
currentSessionState = SessionState.Reconnecting;
NotifySubscriptions(_reconnectingSession, true);
(context as TaskCompletionSource)?.TrySetResult();
break;
case SessionState.Connecting:
case SessionState.Disconnected:
case SessionState.Reconnecting:
// Nothing to do in this state
break;
}
break;
case ConnectionEvent.ReconnectComplete:
// if session recovered, Session property is not null
var reconnected = _reconnectHandler.Session as OpcUaSession;
switch (currentSessionState)
{
case SessionState.Reconnecting:
_logger.LogInformation("{Client}: Completed reconnecting session {Session}...",
this, _sessionName);
//
// Behavior of the reconnect handler is as follows:
// 1) newSession == null
// => then the old session is still good, we missed keep alive.
// 2) newSession != null but equal to previous session
// => new channel was opened but the existing session was reactivated
// 3) newSession != previous Session
// => everything reconnected and new session was activated.
//
reconnected ??= _reconnectingSession;
Debug.Assert(reconnected != null, "reconnected should never be null");
Debug.Assert(reconnected.Connected, "reconnected should always be connected");
// Handles all 3 cases above.
var isNew = await UpdateSessionAsync(reconnected).ConfigureAwait(false);
Debug.Assert(_session != null);
Debug.Assert(_reconnectingSession == null);
if (!isNew)
{
// Case 1) and 2)
_logger.LogInformation("{Client}: Client RECOVERED!", this);
}
else
{
// Case 3)
_logger.LogInformation("{Client}: Client RECONNECTED!", this);
_numberOfConnectionRetries++;
}
// If not already ready, signal we are ready again and ...
NotifyConnectivityStateChange(EndpointConnectivityState.Ready);
// ... allow access to the client again
Debug.Assert(_disconnectLock != null);
_disconnectLock.Dispose();
_disconnectLock = null;
_reconnectRequired = 0;
reconnectPeriod = GetMinReconnectPeriod();
currentSessionState = SessionState.Connected;
if (!await TrySyncAsync(ct).ConfigureAwait(false))
{
TriggerReconnect(StatusCodes.BadNotConnected,
"Failed to synchronize subscriptions after reconnect.");
break;
}
NotifySubscriptions(_session, false);
break;
case SessionState.Connected:
Debug.Fail("Should not signal reconnected when already connected.");
break;
case SessionState.Connecting:
case SessionState.Disconnected:
Debug.Assert(_reconnectingSession == null);
reconnected?.Dispose();
break;
}
break;
case ConnectionEvent.Disconnect:
if (currentSessionState != SessionState.Disconnected)
{
await HandleDisconnectEvent(ct).ConfigureAwait(false);
currentSessionState = SessionState.Disconnected;
}
break;
}
_logger.LogDebug("{Client}: Event {Event} in State {State} processed.", trigger,
this, currentSessionState);
}
}
catch (OperationCanceledException) { }
catch (Exception ex)
{
_logger.LogError(ex, "{Client}: Connection manager exited unexpectedly...", this);
}
finally
{
_reconnectHandler.CancelReconnect();
}
}
}
catch (OperationCanceledException) { }
catch (Exception ex)
{
_logger.LogError(ex, "{Client}: Exception in management loop.", this);
throw;
}
finally
{
if (currentSessionState != SessionState.Disconnected)
{
_logger.LogInformation(
"{Client}: Disconnect because client is disposed.", this);
await HandleDisconnectEvent(default).ConfigureAwait(false);
currentSessionState = SessionState.Disconnected;
}
_logger.LogInformation("{Client}: Exiting client management loop.", this);
}
async ValueTask HandleDisconnectEvent(CancellationToken cancellationToken)
{
// If currently reconnecting, dispose the reconnect handler and stop timer
_reconnectHandler.CancelReconnect();
reconnectTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
// if not already disconnected, aquire writer lock
_disconnectLock ??= await _lock.WriterLockAsync(cancellationToken);
reconnectPeriod = 0;
NotifyConnectivityStateChange(EndpointConnectivityState.Disconnected);
NotifySubscriptions(_session, true);
await CloseSessionAsync().ConfigureAwait(false);
Debug.Assert(_session == null);
}
static void NotifySubscriptions(OpcUaSession? session, bool disconnected)
{
if (session == null)
{
return;
}
foreach (var h in session.SubscriptionHandles.Values)
{
h.NotifySessionConnectionState(disconnected);
}
}
int GetMinReconnectPeriod()
{
var reconnectPeriod = MinReconnectDelay ?? TimeSpan.Zero;
if (reconnectPeriod == TimeSpan.Zero)
{
reconnectPeriod = TimeSpan.FromSeconds(1);
}
if (reconnectPeriod > _maxReconnectPeriod)
{
reconnectPeriod = _maxReconnectPeriod;
}
return (int)reconnectPeriod.TotalMilliseconds;
}
}