DeviceBridge/Services/ConnectionStatusSubscriptionService.cs (60 lines of code) (raw):

// Copyright (c) Microsoft Corporation. All rights reserved. using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using DeviceBridge.Models; using DeviceBridge.Providers; using NLog; namespace DeviceBridge.Services { /// <summary> /// CRUD operations for connection status subscriptions. It synchronizes the DB and device client callback update to make sure /// that the registered callback always reflects the actual subscription stored in the DB. /// The synchronization is separate from data subscriptions, which might take a long time to synchronize due to connection creation. /// </summary> public class ConnectionStatusSubscriptionService : IConnectionStatusSubscriptionService { private readonly IStorageProvider _storageProvider; private readonly IConnectionManager _connectionManager; private readonly ISubscriptionCallbackFactory _subscriptionCallbackFactory; private readonly Logger _logger; private ConcurrentDictionary<string, SemaphoreSlim> _connectionStatusSubscriptionSyncSemaphores = new ConcurrentDictionary<string, SemaphoreSlim>(); public ConnectionStatusSubscriptionService(Logger logger, IConnectionManager connectionManager, IStorageProvider storageProvider, ISubscriptionCallbackFactory subscriptionCallbackFactory) { _logger = logger; _storageProvider = storageProvider; _connectionManager = connectionManager; _subscriptionCallbackFactory = subscriptionCallbackFactory; } public async Task<DeviceSubscription> GetConnectionStatusSubscription(Logger logger, string deviceId, CancellationToken cancellationToken) { return await _storageProvider.GetDeviceSubscription(logger, deviceId, DeviceSubscriptionType.ConnectionStatus, cancellationToken); } public async Task<DeviceSubscription> CreateOrUpdateConnectionStatusSubscription(Logger logger, string deviceId, string callbackUrl, CancellationToken cancellationToken) { var mutex = _connectionStatusSubscriptionSyncSemaphores.GetOrAdd(deviceId, new SemaphoreSlim(1, 1)); await mutex.WaitAsync(); try { _logger.Info("Acquired connection status subscription sync lock for device {deviceId}", deviceId); var subscription = await _storageProvider.CreateOrUpdateDeviceSubscription(logger, deviceId, DeviceSubscriptionType.ConnectionStatus, callbackUrl, cancellationToken); _connectionManager.SetConnectionStatusCallback(deviceId, _subscriptionCallbackFactory.GetConnectionStatusChangeCallback(deviceId, subscription)); return subscription; } finally { mutex.Release(); } } public async Task DeleteConnectionStatusSubscription(Logger logger, string deviceId, CancellationToken cancellationToken) { var mutex = _connectionStatusSubscriptionSyncSemaphores.GetOrAdd(deviceId, new SemaphoreSlim(1, 1)); await mutex.WaitAsync(); try { _logger.Info("Acquired connection status subscription sync lock for device {deviceId}", deviceId); await _storageProvider.DeleteDeviceSubscription(logger, deviceId, DeviceSubscriptionType.ConnectionStatus, cancellationToken); _connectionManager.RemoveConnectionStatusCallback(deviceId); } finally { mutex.Release(); } } } }