public SubscriptionScheduler()

in DeviceBridge/Services/SubscriptionScheduler.cs [48:102]


        public SubscriptionScheduler(Logger logger, IConnectionManager connectionManager, IStorageProvider storageProvider, ISubscriptionCallbackFactory subscriptionCallbackFactory, uint connectionBatchSize, uint connectionBatchIntervalMs)
        {
            _logger = logger;
            _storageProvider = storageProvider;
            _connectionManager = connectionManager;
            _subscriptionCallbackFactory = subscriptionCallbackFactory;
            _connectionBatchSize = connectionBatchSize;
            _connectionBatchIntervalMs = connectionBatchIntervalMs;

            _connectionManager.SetGlobalConnectionStatusCallback(GetRetryGlobalConnectionStatusChangeCallback);

            // Fetch from DB all subscriptions to be initialized on service startup. This must be done synchronously before
            // the service instance is fully constructed, so subsequent requests received by the service can prevent a device
            // from being initialized with stale data by removing items from this list.
            _logger.Info("Attempting to fetch all subscriptions to initialize from DB");
            var allSubscriptions = _storageProvider.ListAllSubscriptionsOrderedByDeviceId(_logger).Result;
            dataSubscriptionsToInitialize = new ConcurrentDictionary<string, List<DeviceSubscription>>();
            string currentDeviceId = null;
            List<DeviceSubscription> currentDeviceDataSubscriptions = null;

            foreach (var subscription in allSubscriptions.FindAll(s => s.SubscriptionType.IsDataSubscription()))
            {
                var deviceId = subscription.DeviceId;

                // Since results are grouped by device Id, we store the results of a device once we move to the next one.
                if (deviceId != currentDeviceId)
                {
                    if (currentDeviceId != null && currentDeviceDataSubscriptions != null)
                    {
                        dataSubscriptionsToInitialize.TryAdd(currentDeviceId, currentDeviceDataSubscriptions);
                    }

                    currentDeviceId = deviceId;
                    currentDeviceDataSubscriptions = new List<DeviceSubscription>();
                }

                currentDeviceDataSubscriptions.Add(subscription);
            }

            // Store the results for the last device.
            if (currentDeviceId != null && currentDeviceDataSubscriptions != null)
            {
                dataSubscriptionsToInitialize.TryAdd(currentDeviceId, currentDeviceDataSubscriptions);
            }

            // Synchronously initialize all connection status subscriptions before the service is fully constructed. This ensures that
            // subscriptions are in place before any connection can be established, so no status change events are missed.
            // No lock is needed, since no other concurrent operation is received until the service starts.
            _logger.Info("Attempting to initialize all connection status subscriptions");

            foreach (var connectionStatusSubscription in allSubscriptions.FindAll(s => s.SubscriptionType == DeviceSubscriptionType.ConnectionStatus))
            {
                _connectionManager.SetConnectionStatusCallback(connectionStatusSubscription.DeviceId, _subscriptionCallbackFactory.GetConnectionStatusChangeCallback(connectionStatusSubscription.DeviceId, connectionStatusSubscription));
            }
        }