DeviceBridge/Services/DataSubscriptionService.cs (44 lines of code) (raw):

// Copyright (c) Microsoft Corporation. All rights reserved. using System.Threading; using System.Threading.Tasks; using DeviceBridge.Models; using DeviceBridge.Providers; using NLog; namespace DeviceBridge.Services { /// <summary> /// CRUD operations for data (C2D) subscriptions. /// This module takes care of the storage of data subscriptions and hands over all connection management operations asynchronously to the scheduler. /// </summary> public class DataSubscriptionService : IDataSubscriptionService { private readonly Logger _logger; private readonly IStorageProvider _storageProvider; private readonly ISubscriptionScheduler _subscriptionScheduler; public DataSubscriptionService(Logger logger, IStorageProvider storageProvider, ISubscriptionScheduler subscriptionScheduler) { _logger = logger; _storageProvider = storageProvider; _subscriptionScheduler = subscriptionScheduler; } public async Task<DeviceSubscriptionWithStatus> GetDataSubscription(Logger logger, string deviceId, DeviceSubscriptionType subscriptionType, CancellationToken cancellationToken) { var subscription = await _storageProvider.GetDeviceSubscription(logger, deviceId, subscriptionType, cancellationToken); return (subscription != null) ? new DeviceSubscriptionWithStatus(subscription) { Status = _subscriptionScheduler.ComputeDataSubscriptionStatus(deviceId, subscription.SubscriptionType, subscription.CallbackUrl), } : null; } public async Task<DeviceSubscriptionWithStatus> CreateOrUpdateDataSubscription(Logger logger, string deviceId, DeviceSubscriptionType subscriptionType, string callbackUrl, CancellationToken cancellationToken) { var subscription = await _storageProvider.CreateOrUpdateDeviceSubscription(logger, deviceId, subscriptionType, callbackUrl, cancellationToken); var _ = _subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync(deviceId).ContinueWith(t => _logger.Error(t.Exception, "Failed to synchronize DB subscriptions and connection state for device {deviceId}", deviceId), TaskContinuationOptions.OnlyOnFaulted); return new DeviceSubscriptionWithStatus(subscription) { Status = _subscriptionScheduler.ComputeDataSubscriptionStatus(deviceId, subscription.SubscriptionType, subscription.CallbackUrl), }; } public async Task DeleteDataSubscription(Logger logger, string deviceId, DeviceSubscriptionType subscriptionType, CancellationToken cancellationToken) { await _storageProvider.DeleteDeviceSubscription(logger, deviceId, subscriptionType, cancellationToken); var _ = _subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync(deviceId).ContinueWith(t => _logger.Error(t.Exception, "Failed to synchronize DB subscriptions and connection state for device {deviceId}", deviceId), TaskContinuationOptions.OnlyOnFaulted); } } }