public async Task AssertDeviceConnectionOpenAsync()

in DeviceBridge/Services/ConnectionManager.cs [165:314]


        public async Task AssertDeviceConnectionOpenAsync(string deviceId, bool temporary = false, CancellationToken? cancellationToken = null)
        {
            _logger.Info("Attempting to initialize {connectionType} connection for device {deviceId}", temporary ? "Temporary" : "Permanent", deviceId);
            _lastConnectionAttempt.AddOrUpdate(deviceId, DateTime.Now, (key, oldValue) => DateTime.Now);

            var mutex = _clientSemaphores.GetOrAdd(deviceId, new SemaphoreSlim(1, 1));
            await mutex.WaitAsync();

            try
            {
                _logger.Info("Acquired connection lock for device {deviceId}", deviceId);

                if (temporary)
                {
                    // Always renew the connection duration, as the user wants to assert that this connection will live for a few minutes (even if a previous temporary connection exists).
                    // We use a random factor to spread out when temporary connections expire.
                    var shouldLiveUntil = DateTimeOffset.UtcNow.ToUnixTimeSeconds() + new Random().Next(TemporaryConnectionMinDurationSeconds, TemporaryConnectionMaxDurationSeconds);
                    _hasTemporaryConnectionUntil.AddOrUpdate(deviceId, shouldLiveUntil, (key, oldValue) => shouldLiveUntil);
                    _logger.Info("Temporary connection for device {deviceId} set to live at least until {shouldLiveUntil}", deviceId, DateTimeOffset.FromUnixTimeSeconds(shouldLiveUntil).UtcDateTime);
                }
                else
                {
                    _hasPermanentConnection.AddOrUpdate(deviceId, true, (key, oldValue) => true);
                }

                // Dispose the current client if it is in a permanent failure state.
                if (_clientStatuses.TryGetValue(deviceId, out (ConnectionStatus status, ConnectionStatusChangeReason reason) currentStatus))
                {
                    // Permanent failure state, taken from https://github.com/Azure-Samples/azure-iot-samples-csharp/tree/master/iot-hub/Samples/device/DeviceReconnectionSample
                    bool isFailed = currentStatus.status == ConnectionStatus.Disconnected;

                    if (isFailed && _clients.TryRemove(deviceId, out DeviceClient existingClient))
                    {
                        _logger.Info("Disposing existing failed client for device {deviceId}", deviceId);
                        await existingClient.CloseAsync();
                        existingClient.Dispose();
                        existingClient.SetConnectionStatusChangesHandler(null);
                    }
                }

                if (_clients.TryGetValue(deviceId, out DeviceClient _))
                {
                    _logger.Info("Connection for device {deviceId} already exists", deviceId);
                    return;
                }

                var deviceKey = ComputeDerivedSymmetricKey(Convert.FromBase64String(_sasKey), deviceId);

                // If we already know this device's hub, attempt to connect to it first.
                if (_deviceHubs.TryGetValue(deviceId, out string knownDeviceHub))
                {
                    try
                    {
                        var client = await BuildAndOpenClient(_logger, knownDeviceHub, deviceKey, cancellationToken);
                        _clients.AddOrUpdate(deviceId, client, (key, oldValue) => client);
                        return;
                    }
                    catch (Exception e)
                    {
                        _logger.Error(e, "Failed to connect device {deviceId} to it's old hub ({knownDeviceHub}). Will try DPS registration again.", deviceId, knownDeviceHub);
                    }
                }

                // If connecting to the cached Hub failed, try DPS registration.
                {
                    var deviceHub = await DpsRegisterInternalAsync(_logger, deviceId, deviceKey, null, cancellationToken);
                    _deviceHubs.AddOrUpdate(deviceId, deviceHub, (key, oldValue) => deviceHub);

                    try
                    {
                        await _storageProvider.AddOrUpdateHubCacheEntry(_logger, deviceId, deviceHub);
                    }
                    catch (Exception e)
                    {
                        // Storing the hub is a best-effort operation.
                        _logger.Error(e, "Failed to update Hub cache for device {deviceId}", deviceId);
                    }

                    var client = await BuildAndOpenClient(_logger, deviceHub, deviceKey, cancellationToken);
                    _clients.AddOrUpdate(deviceId, client, (key, oldValue) => client);
                }
            }
            finally
            {
                mutex.Release();
            }

            async Task<DeviceClient> BuildAndOpenClient(Logger logger, string candidateHub, string deviceKey, CancellationToken? cancellationToken = null)
            {
                _logger.Info("Attempting to connect device {deviceId} to hub {candidateHub}", deviceId, candidateHub);
                DeviceClient client = null;

                try
                {
                    var settings = new ITransportSettings[]
                    {
                        new AmqpTransportSettings(TransportType.Amqp_Tcp_Only)
                        {
                            AmqpConnectionPoolSettings = new AmqpConnectionPoolSettings()
                            {
                                Pooling = true,
                                MaxPoolSize = _maxPoolSize,
                            },
                        },
                    };

                    var connString = GetDeviceConnectionString(deviceId, candidateHub, deviceKey);
                    client = DeviceClient.CreateFromConnectionString(connString, settings);
                    client.SetConnectionStatusChangesHandler(BuildConnectionStatusChangeHandler(deviceId));
                    client.SetRetryPolicy(new ExponentialBackoff(ClientRetryCount, TimeSpan.FromMilliseconds(ClientRetryMinBackoffMs), TimeSpan.FromSeconds(ClientRetryMaxBackoffSec), TimeSpan.FromMilliseconds(ClientRetryDeltaBackoffMs)));

                    // If a desired property callback exists, register it.
                    if (_desiredPropertyUpdateCallbacks.TryGetValue(deviceId, out var desiredPropertyUpdateCallback))
                    {
                        await client.SetDesiredPropertyUpdateCallbackAsync(desiredPropertyUpdateCallback.callback, null);
                    }

                    // If a method callback exists, register it.
                    if (_methodCallbacks.TryGetValue(deviceId, out var methodCallback))
                    {
                        await client.SetMethodDefaultHandlerAsync(methodCallback.callback, null);
                    }

                    // If a C2DMessage callback exists, register it.
                    if (_messageCallbacks.TryGetValue(deviceId, out var messageCallback))
                    {
                        await client.SetReceiveMessageHandlerAsync(messageCallback.callback, null);
                    }

                    if (cancellationToken.HasValue)
                    {
                        await client.OpenAsync(cancellationToken.Value);
                    }
                    else
                    {
                        await client.OpenAsync();
                    }

                    _logger.Info("Device {deviceId} connected to hub {candidateHub}", deviceId, candidateHub);
                    return client;
                }
                catch (Exception e)
                {
                    // Dispose of the failed client to make sure it doesn't retry internally.
                    client?.Dispose();
                    client?.SetConnectionStatusChangesHandler(null);
                    throw e;
                }
            }
        }