in dotnet/src/Azure.Iot.Operations.Mqtt/Session/MqttSessionClient.cs [397:560]
private async Task<MqttClientConnectResult?> MaintainConnectionAsync(MqttClientOptions options, MqttClientDisconnectedEventArgs? lastDisconnect, CancellationToken cancellationToken)
{
// This function is either called when initially connecting the client or when reconnecting it. The behavior
// of this function should change depending on the context it was called. For instance, thrown exceptions are the expected
// behavior when called from the initial ConnectAsync thread, but any exceptions thrown in the reconnection thread will be
// unhandled and may crash the client.
bool isReconnection = lastDisconnect != null;
uint attemptCount = 1;
MqttClientConnectResult? mostRecentConnectResult = null;
Exception? lastException = lastDisconnect?.Exception;
TimeSpan retryDelay = TimeSpan.Zero;
while (true)
{
// This flag signals that the user is trying to close the connection. If this happens when the client is reconnection,
// simply abandon reconnecting and end this task.
if (_isClosing && isReconnection)
{
return null;
}
else if (_isClosing && lastException != null)
{
// If the user disconnects the client while they were trying to connect it,
// stop trying to connect it and just report the most recent error.
throw lastException;
}
if (!isReconnection && attemptCount > 1 && !_sessionClientOptions.RetryOnFirstConnect)
{
Debug.Assert(lastException != null);
throw lastException;
}
if (IsFatal(lastException!, _reconnectionCancellationToken?.Token.IsCancellationRequested ?? cancellationToken.IsCancellationRequested))
{
Trace.TraceError("Encountered a fatal exception while maintaining connection {0}", lastException);
if (isReconnection)
{
var retryException = new RetryExpiredException("A fatal error was encountered while trying to re-establish the session, so this request cannot be completed.", lastException);
// This function was called to reconnect after an unexpected disconnect. Since the error is fatal,
// notify the user via callback that the client has crashed, but don't throw the exception since
// this task is unmonitored.
await FinalizeSessionAsync(retryException, lastDisconnect!, cancellationToken);
return null;
}
else
{
// This function was called directly by the user via ConnectAsync, so just throw the exception.
throw lastException!;
}
}
// Always consult the retry policy when reconnecting, but only consult it on attempt > 1 when
// initially connecting
if ((isReconnection || attemptCount > 1)
&& !_sessionClientOptions.ConnectionRetryPolicy.ShouldRetry(attemptCount, lastException!, out retryDelay))
{
// Should not occur as it's indefinite retry
Trace.TraceError("Retry policy was exhausted while trying to maintain a connection {0}", lastException);
var retryException = new RetryExpiredException("Retry policy has been exhausted. See inner exception for the latest exception encountered while retrying.", lastException);
if (lastDisconnect != null)
{
// This function was called to reconnect after an unexpected disconnect. Since the error is fatal,
// notify the user via callback that the client has crashed, but don't throw the exception since
// this task is unmonitored.
var disconnectedEventArgs = new MqttClientDisconnectedEventArgs(
lastDisconnect.ClientWasConnected,
mostRecentConnectResult,
lastDisconnect.Reason,
lastDisconnect.ReasonString,
lastDisconnect.UserProperties,
retryException);
await FinalizeSessionAsync(retryException, disconnectedEventArgs, cancellationToken);
return null;
}
else
{
// This function was called directly by the user via ConnectAsync, so just throw the exception.
throw retryException;
}
}
// With all the above conditions checked, the client should attempt to connect again after a delay
try
{
if (retryDelay.CompareTo(TimeSpan.Zero) > 0)
{
Trace.TraceInformation("Waiting {0} before next reconnect attempt", retryDelay);
await Task.Delay(retryDelay, cancellationToken);
}
cancellationToken.ThrowIfCancellationRequested();
Trace.TraceInformation($"Trying to connect. Attempt number {attemptCount}");
if (isReconnection || _sessionClientOptions.RetryOnFirstConnect)
{
using CancellationTokenSource reconnectionTimeoutCancellationToken = new();
reconnectionTimeoutCancellationToken.CancelAfter(_sessionClientOptions.ConnectionAttemptTimeout);
using CancellationTokenSource linkedCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, reconnectionTimeoutCancellationToken.Token);
mostRecentConnectResult = await TryEstablishConnectionAsync(options, linkedCancellationToken.Token).ConfigureAwait(false);
}
else
{
mostRecentConnectResult = await TryEstablishConnectionAsync(options, cancellationToken).ConfigureAwait(false);
}
// If the connection was re-established, but the session was lost, report it as a fatal error to the user and disconnect from the broker.
if (mostRecentConnectResult != null
&& mostRecentConnectResult.ResultCode == MqttClientConnectResultCode.Success
&& isReconnection
&& !mostRecentConnectResult.IsSessionPresent)
{
var disconnectedArgs = new MqttClientDisconnectedEventArgs(
true,
null,
MqttClientDisconnectReason.NormalDisconnection,
"The session client re-established the connection, but the MQTT broker no longer had the session.",
null,
new MqttSessionExpiredException());
MqttSessionExpiredException queuedItemException = new("The queued request has been cancelled because the session is no longer present");
await FinalizeSessionAsync(queuedItemException, disconnectedArgs, cancellationToken);
Trace.TraceError("Reconnection succeeded, but the session was lost so the client closed the connection.");
await FinalizeSessionAsync(queuedItemException, disconnectedArgs, cancellationToken);
// The provided cancellation token will be cancelled while disconnecting, so don't pass it along
await DisconnectAsync(null, CancellationToken.None).ConfigureAwait(false);
// Reconnection should end because the session was lost
return null;
}
if (isReconnection)
{
Trace.TraceInformation("Reconnection finished after successfully connecting to the MQTT broker again and re-joining the existing MQTT session.");
}
if (mostRecentConnectResult != null
&& mostRecentConnectResult.ResultCode == MqttClientConnectResultCode.Success)
{
StartPublishingSubscribingAndUnsubscribing();
}
return mostRecentConnectResult;
}
catch (Exception) when (_isClosing && isReconnection)
{
// This happens when reconnecting if the user attempts to manually disconnect the session client. When
// that happens, we simply want to end the reconnection logic and let the thread end without throwing.
Trace.TraceInformation("Session client reconnection cancelled because the client is being closed.");
return null;
}
catch (Exception e)
{
lastException = e;
Trace.TraceWarning($"Encountered an exception while connecting. May attempt to reconnect. {e}");
}
attemptCount++;
}
}