in DeviceBridge/Services/SubscriptionScheduler.cs [216:302]
public async Task SynchronizeDeviceDbAndEngineDataSubscriptionsAsync(string deviceId, bool useInitializationList = false)
{
_logger.Info("Attempting to synchronize DB data subscriptions and internal state for device {deviceId}", deviceId);
// Synchronizing this code is the only way to guarantee that the current state of the runner will always reflect the latest state in the DB.
// Otherwise we could end up in an inconsistent state if a subscription is deleted and recreated too fast or if a subscription is modified
// while we're initializing the device with data fetched from the DB on startup.
var mutex = _dbAndConnectionStateSyncSemaphores.GetOrAdd(deviceId, new SemaphoreSlim(1, 1));
await mutex.WaitAsync();
try
{
_logger.Info("Acquired DB and connection state sync lock for device {deviceId}", deviceId);
List<DeviceSubscription> dataSubscriptions;
if (useInitializationList)
{
if (!dataSubscriptionsToInitialize.TryGetValue(deviceId, out dataSubscriptions))
{
_logger.Info("Subscriptions for Device {deviceId} not found in initialization list", deviceId);
return;
}
}
else
{
dataSubscriptions = (await _storageProvider.ListDeviceSubscriptions(_logger, deviceId)).FindAll(s => s.SubscriptionType.IsDataSubscription());
}
// Remove the device form the initialization list to mark it as already initialized.
dataSubscriptionsToInitialize.TryRemove(deviceId, out _);
// If a desired property subscription exists, register the callback. If not, ensure that the callback doesn't exist.
var desiredPropertySubscription = dataSubscriptions.Find(s => s.SubscriptionType == DeviceSubscriptionType.DesiredProperties);
if (desiredPropertySubscription != null)
{
await _connectionManager.SetDesiredPropertyUpdateCallbackAsync(deviceId, desiredPropertySubscription.CallbackUrl, _subscriptionCallbackFactory.GetDesiredPropertyUpdateCallback(deviceId, desiredPropertySubscription));
}
else
{
await _connectionManager.RemoveDesiredPropertyUpdateCallbackAsync(deviceId);
}
// If a method subscription exists, register the callback. If not, ensure that the callback doesn't exist.
var methodSubscription = dataSubscriptions.Find(s => s.SubscriptionType == DeviceSubscriptionType.Methods);
if (methodSubscription != null)
{
await _connectionManager.SetMethodCallbackAsync(deviceId, methodSubscription.CallbackUrl, _subscriptionCallbackFactory.GetMethodCallback(deviceId, methodSubscription));
}
else
{
await _connectionManager.RemoveMethodCallbackAsync(deviceId);
}
// If a C2D subscription exists, register the callback. If not, ensure that the callback doesn't exist.
var messageSubscription = dataSubscriptions.Find(s => s.SubscriptionType == DeviceSubscriptionType.C2DMessages);
if (messageSubscription != null)
{
await _connectionManager.SetMessageCallbackAsync(deviceId, messageSubscription.CallbackUrl, _subscriptionCallbackFactory.GetReceiveC2DMessageCallback(deviceId, messageSubscription));
}
else
{
await _connectionManager.RemoveMessageCallbackAsync(deviceId);
}
// The device needs a connection constantly open if at least one data subscription exists. If not, the connection can be closed.
if (dataSubscriptions.Count > 0)
{
_hasDataSubscriptions.TryAdd(deviceId, 0 /* placeholder */);
// Schedule a connection to be opened as soon as possible.
var notBefore = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
_scheduledConnectionsNotBefore.AddOrUpdate(deviceId, notBefore, (key, oldValue) => notBefore);
}
else
{
_hasDataSubscriptions.TryRemove(deviceId, out _);
// Remove any scheduled connections and close the connection right away.
_scheduledConnectionsNotBefore.TryRemove(deviceId, out _);
await _connectionManager.AssertDeviceConnectionClosedAsync(deviceId);
}
}
finally
{
mutex.Release();
}
}