in DeviceBridge/Services/SubscriptionScheduler.cs [48:102]
public SubscriptionScheduler(Logger logger, IConnectionManager connectionManager, IStorageProvider storageProvider, ISubscriptionCallbackFactory subscriptionCallbackFactory, uint connectionBatchSize, uint connectionBatchIntervalMs)
{
_logger = logger;
_storageProvider = storageProvider;
_connectionManager = connectionManager;
_subscriptionCallbackFactory = subscriptionCallbackFactory;
_connectionBatchSize = connectionBatchSize;
_connectionBatchIntervalMs = connectionBatchIntervalMs;
_connectionManager.SetGlobalConnectionStatusCallback(GetRetryGlobalConnectionStatusChangeCallback);
// Fetch from DB all subscriptions to be initialized on service startup. This must be done synchronously before
// the service instance is fully constructed, so subsequent requests received by the service can prevent a device
// from being initialized with stale data by removing items from this list.
_logger.Info("Attempting to fetch all subscriptions to initialize from DB");
var allSubscriptions = _storageProvider.ListAllSubscriptionsOrderedByDeviceId(_logger).Result;
dataSubscriptionsToInitialize = new ConcurrentDictionary<string, List<DeviceSubscription>>();
string currentDeviceId = null;
List<DeviceSubscription> currentDeviceDataSubscriptions = null;
foreach (var subscription in allSubscriptions.FindAll(s => s.SubscriptionType.IsDataSubscription()))
{
var deviceId = subscription.DeviceId;
// Since results are grouped by device Id, we store the results of a device once we move to the next one.
if (deviceId != currentDeviceId)
{
if (currentDeviceId != null && currentDeviceDataSubscriptions != null)
{
dataSubscriptionsToInitialize.TryAdd(currentDeviceId, currentDeviceDataSubscriptions);
}
currentDeviceId = deviceId;
currentDeviceDataSubscriptions = new List<DeviceSubscription>();
}
currentDeviceDataSubscriptions.Add(subscription);
}
// Store the results for the last device.
if (currentDeviceId != null && currentDeviceDataSubscriptions != null)
{
dataSubscriptionsToInitialize.TryAdd(currentDeviceId, currentDeviceDataSubscriptions);
}
// Synchronously initialize all connection status subscriptions before the service is fully constructed. This ensures that
// subscriptions are in place before any connection can be established, so no status change events are missed.
// No lock is needed, since no other concurrent operation is received until the service starts.
_logger.Info("Attempting to initialize all connection status subscriptions");
foreach (var connectionStatusSubscription in allSubscriptions.FindAll(s => s.SubscriptionType == DeviceSubscriptionType.ConnectionStatus))
{
_connectionManager.SetConnectionStatusCallback(connectionStatusSubscription.DeviceId, _subscriptionCallbackFactory.GetConnectionStatusChangeCallback(connectionStatusSubscription.DeviceId, connectionStatusSubscription));
}
}