public async Task SynchronizeDeviceDbAndEngineDataSubscriptionsAsync()

in DeviceBridge/Services/SubscriptionScheduler.cs [216:302]


        public async Task SynchronizeDeviceDbAndEngineDataSubscriptionsAsync(string deviceId, bool useInitializationList = false)
        {
            _logger.Info("Attempting to synchronize DB data subscriptions and internal state for device {deviceId}", deviceId);

            // Synchronizing this code is the only way to guarantee that the current state of the runner will always reflect the latest state in the DB.
            // Otherwise we could end up in an inconsistent state if a subscription is deleted and recreated too fast or if a subscription is modified
            // while we're initializing the device with data fetched from the DB on startup.
            var mutex = _dbAndConnectionStateSyncSemaphores.GetOrAdd(deviceId, new SemaphoreSlim(1, 1));
            await mutex.WaitAsync();

            try
            {
                _logger.Info("Acquired DB and connection state sync lock for device {deviceId}", deviceId);
                List<DeviceSubscription> dataSubscriptions;

                if (useInitializationList)
                {
                    if (!dataSubscriptionsToInitialize.TryGetValue(deviceId, out dataSubscriptions))
                    {
                        _logger.Info("Subscriptions for Device {deviceId} not found in initialization list", deviceId);
                        return;
                    }
                }
                else
                {
                    dataSubscriptions = (await _storageProvider.ListDeviceSubscriptions(_logger, deviceId)).FindAll(s => s.SubscriptionType.IsDataSubscription());
                }

                // Remove the device form the initialization list to mark it as already initialized.
                dataSubscriptionsToInitialize.TryRemove(deviceId, out _);

                // If a desired property subscription exists, register the callback. If not, ensure that the callback doesn't exist.
                var desiredPropertySubscription = dataSubscriptions.Find(s => s.SubscriptionType == DeviceSubscriptionType.DesiredProperties);
                if (desiredPropertySubscription != null)
                {
                    await _connectionManager.SetDesiredPropertyUpdateCallbackAsync(deviceId, desiredPropertySubscription.CallbackUrl, _subscriptionCallbackFactory.GetDesiredPropertyUpdateCallback(deviceId, desiredPropertySubscription));
                }
                else
                {
                    await _connectionManager.RemoveDesiredPropertyUpdateCallbackAsync(deviceId);
                }

                // If a method subscription exists, register the callback. If not, ensure that the callback doesn't exist.
                var methodSubscription = dataSubscriptions.Find(s => s.SubscriptionType == DeviceSubscriptionType.Methods);
                if (methodSubscription != null)
                {
                    await _connectionManager.SetMethodCallbackAsync(deviceId, methodSubscription.CallbackUrl, _subscriptionCallbackFactory.GetMethodCallback(deviceId, methodSubscription));
                }
                else
                {
                    await _connectionManager.RemoveMethodCallbackAsync(deviceId);
                }

                // If a C2D subscription exists, register the callback. If not, ensure that the callback doesn't exist.
                var messageSubscription = dataSubscriptions.Find(s => s.SubscriptionType == DeviceSubscriptionType.C2DMessages);
                if (messageSubscription != null)
                {
                    await _connectionManager.SetMessageCallbackAsync(deviceId, messageSubscription.CallbackUrl, _subscriptionCallbackFactory.GetReceiveC2DMessageCallback(deviceId, messageSubscription));
                }
                else
                {
                    await _connectionManager.RemoveMessageCallbackAsync(deviceId);
                }

                // The device needs a connection constantly open if at least one data subscription exists. If not, the connection can be closed.
                if (dataSubscriptions.Count > 0)
                {
                    _hasDataSubscriptions.TryAdd(deviceId, 0 /* placeholder */);

                    // Schedule a connection to be opened as soon as possible.
                    var notBefore = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
                    _scheduledConnectionsNotBefore.AddOrUpdate(deviceId, notBefore, (key, oldValue) => notBefore);
                }
                else
                {
                    _hasDataSubscriptions.TryRemove(deviceId, out _);

                    // Remove any scheduled connections and close the connection right away.
                    _scheduledConnectionsNotBefore.TryRemove(deviceId, out _);
                    await _connectionManager.AssertDeviceConnectionClosedAsync(deviceId);
                }
            }
            finally
            {
                mutex.Release();
            }
        }