in iothub/device/src/Pipeline/RetryDelegatingHandler.cs [1068:1196]
private async Task HandleDisconnectAsync()
{
if (_isDisposed)
{
if (Logging.IsEnabled)
Logging.Info(this, "Disposed during disconnection.", nameof(HandleDisconnectAsync));
_handleDisconnectCts?.Cancel();
}
try
{
// No timeout on connection being established.
await WaitForTransportClosedAsync().ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Canceled when the transport is being closed by the application.
if (Logging.IsEnabled)
Logging.Info(this, "Transport disconnected: closed by application.", nameof(HandleDisconnectAsync));
_onConnectionStatusChanged(ConnectionStatus.Disabled, ConnectionStatusChangeReason.Client_Close);
return;
}
if (Logging.IsEnabled)
Logging.Info(this, "Transport disconnected: unexpected.", nameof(HandleDisconnectAsync));
await _clientOpenSemaphore.WaitAsync().ConfigureAwait(false);
SetClientTransportStatus(ClientTransportStatus.Closed);
try
{
// This is used to ensure that when NoRetry() policy is enabled, we should not be retrying.
if (!_internalRetryPolicy.RetryStrategy.GetShouldRetry().Invoke(0, new IotHubCommunicationException(), out TimeSpan delay))
{
if (Logging.IsEnabled)
Logging.Info(this, "Transport disconnected: closed by application.", nameof(HandleDisconnectAsync));
_onConnectionStatusChanged(ConnectionStatus.Disconnected, ConnectionStatusChangeReason.Retry_Expired);
return;
}
if (delay > TimeSpan.Zero)
{
await Task.Delay(delay).ConfigureAwait(false);
}
// always reconnect.
_onConnectionStatusChanged(ConnectionStatus.Disconnected_Retrying, ConnectionStatusChangeReason.Communication_Error);
CancellationToken cancellationToken = _handleDisconnectCts?.Token ?? CancellationToken.None;
// This will recover to the state before the disconnect.
await _internalRetryPolicy.RunWithRetryAsync(async () =>
{
if (Logging.IsEnabled)
Logging.Info(this, "Attempting to recover subscriptions.", nameof(HandleDisconnectAsync));
await base.OpenAsync(cancellationToken).ConfigureAwait(false);
var tasks = new List<Task>(4);
// This is to ensure that, if previously enabled, the callback to receive direct methods is recovered.
if (_methodsEnabled)
{
tasks.Add(base.EnableMethodsAsync(cancellationToken));
}
// This is to ensure that, if previously enabled, the callback to receive twin properties is recovered.
if (_twinEnabled)
{
tasks.Add(base.EnableTwinPatchAsync(cancellationToken));
}
// This is to ensure that, if previously enabled, the callback to receive events for modules is recovered.
if (_eventsEnabled)
{
tasks.Add(base.EnableEventReceiveAsync(_isAnEdgeModule, cancellationToken));
}
// This is to ensure that, if previously enabled, the callback to receive C2D messages is recovered.
if (_deviceReceiveMessageEnabled)
{
tasks.Add(base.EnableReceiveMessageAsync(cancellationToken));
}
if (tasks.Any())
{
await Task.WhenAll(tasks).ConfigureAwait(false);
}
// Don't check for unhandled C2D messages until the callback (EnableReceiveMessageAsync) is hooked up.
if (_deviceReceiveMessageEnabled)
{
await base.EnsurePendingMessagesAreDeliveredAsync(cancellationToken).ConfigureAwait(false);
}
// Send the request for transport close notification.
_transportClosedTask = HandleDisconnectAsync();
SetClientTransportStatus(ClientTransportStatus.Open);
_onConnectionStatusChanged(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok);
if (Logging.IsEnabled)
Logging.Info(this, "Subscriptions recovered.", nameof(HandleDisconnectAsync));
},
cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
if (Logging.IsEnabled)
Logging.Error(this, ex.ToString(), nameof(HandleDisconnectAsync));
HandleConnectionStatusExceptions(ex, true);
}
finally
{
try
{
_clientOpenSemaphore?.Release();
}
catch (ObjectDisposedException) when (_isDisposing)
{
if (Logging.IsEnabled)
Logging.Error(this, "Tried releasing twin event subscription semaphore but it has already been disposed by client disposal on a separate thread." +
"Ignoring this exception and continuing with client cleanup.");
}
}
}