DeviceBridge/Services/SubscriptionScheduler.cs (287 lines of code) (raw):
// Copyright (c) Microsoft Corporation. All rights reserved.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DeviceBridge.Models;
using DeviceBridge.Providers;
using Microsoft.Azure.Devices.Client;
using NLog;
namespace DeviceBridge.Services
{
/// <summary>
/// Synchronizes the data (C2D) subscriptions of devices and their internal connection state, including:
/// - Initialization of existing subscriptions at service startup.
/// - Management of long-lived connections and retries on persistent connection failures (due to cloud-side scaling, disaster, and Hub moves).
/// - Connection rate limiting.
/// - Computation of subscription status based on internal connection state.
/// </summary>
public class SubscriptionScheduler : ISubscriptionScheduler
{
public const uint DefaultConnectionBatchSize = 150; // Maximum number of connections to start in a given execution of the connection scheduler.
public const uint DefaultConnectionBatchIntervalMs = 1000; // Interval between executions of the connection scheduler.
private const int ConnectionBackoffPerFailedAttemptSeconds = 5 * 60; // 5 minutes
private const int MaxConnectionBackoffSeconds = 30 * 60; // 30 minutes
private const string SubscriptionStatusStarting = "Starting";
private const string SubscriptionStatusRunning = "Running";
private const string SubscriptionStatusStopped = "Stopped";
private readonly uint _connectionBatchSize;
private readonly uint _connectionBatchIntervalMs;
private readonly IStorageProvider _storageProvider;
private readonly IConnectionManager _connectionManager;
private readonly ISubscriptionCallbackFactory _subscriptionCallbackFactory;
private readonly Logger _logger;
private readonly ConcurrentDictionary<string, List<DeviceSubscription>> dataSubscriptionsToInitialize;
private ConcurrentDictionary<string, SemaphoreSlim> _dbAndConnectionStateSyncSemaphores = new ConcurrentDictionary<string, SemaphoreSlim>();
private ConcurrentDictionary<string, long> _scheduledConnectionsNotBefore = new ConcurrentDictionary<string, long>(); // Stores which devices have a scheduled connection and the earliest timestamp that connection should be attempted (not before).
private ConcurrentDictionary<string, byte> _hasDataSubscriptions = new ConcurrentDictionary<string, byte>(); // Stores which device Ids have data subscriptions. NOTE: used as workaround for absence of a ConcurrentHashSet (as recommended)
private ConcurrentDictionary<string, int> _consecutiveConnectionFailures = new ConcurrentDictionary<string, int>(); // Stores how many consecutive times a device failed to connect. NOTE: used as workaround for absence of a ConcurrentHashSet (as recommended)
public SubscriptionScheduler(Logger logger, IConnectionManager connectionManager, IStorageProvider storageProvider, ISubscriptionCallbackFactory subscriptionCallbackFactory, uint connectionBatchSize, uint connectionBatchIntervalMs)
{
_logger = logger;
_storageProvider = storageProvider;
_connectionManager = connectionManager;
_subscriptionCallbackFactory = subscriptionCallbackFactory;
_connectionBatchSize = connectionBatchSize;
_connectionBatchIntervalMs = connectionBatchIntervalMs;
_connectionManager.SetGlobalConnectionStatusCallback(GetRetryGlobalConnectionStatusChangeCallback);
// Fetch from DB all subscriptions to be initialized on service startup. This must be done synchronously before
// the service instance is fully constructed, so subsequent requests received by the service can prevent a device
// from being initialized with stale data by removing items from this list.
_logger.Info("Attempting to fetch all subscriptions to initialize from DB");
var allSubscriptions = _storageProvider.ListAllSubscriptionsOrderedByDeviceId(_logger).Result;
dataSubscriptionsToInitialize = new ConcurrentDictionary<string, List<DeviceSubscription>>();
string currentDeviceId = null;
List<DeviceSubscription> currentDeviceDataSubscriptions = null;
foreach (var subscription in allSubscriptions.FindAll(s => s.SubscriptionType.IsDataSubscription()))
{
var deviceId = subscription.DeviceId;
// Since results are grouped by device Id, we store the results of a device once we move to the next one.
if (deviceId != currentDeviceId)
{
if (currentDeviceId != null && currentDeviceDataSubscriptions != null)
{
dataSubscriptionsToInitialize.TryAdd(currentDeviceId, currentDeviceDataSubscriptions);
}
currentDeviceId = deviceId;
currentDeviceDataSubscriptions = new List<DeviceSubscription>();
}
currentDeviceDataSubscriptions.Add(subscription);
}
// Store the results for the last device.
if (currentDeviceId != null && currentDeviceDataSubscriptions != null)
{
dataSubscriptionsToInitialize.TryAdd(currentDeviceId, currentDeviceDataSubscriptions);
}
// Synchronously initialize all connection status subscriptions before the service is fully constructed. This ensures that
// subscriptions are in place before any connection can be established, so no status change events are missed.
// No lock is needed, since no other concurrent operation is received until the service starts.
_logger.Info("Attempting to initialize all connection status subscriptions");
foreach (var connectionStatusSubscription in allSubscriptions.FindAll(s => s.SubscriptionType == DeviceSubscriptionType.ConnectionStatus))
{
_connectionManager.SetConnectionStatusCallback(connectionStatusSubscription.DeviceId, _subscriptionCallbackFactory.GetConnectionStatusChangeCallback(connectionStatusSubscription.DeviceId, connectionStatusSubscription));
}
}
/// <summary>
/// Determines the status of a subscription based on the current state of the device client.
/// </summary>
/// <param name="deviceId">Id of the device for which to check the subscription status.</param>
/// <param name="subscriptionType">Type of the subscription that we want the status for.</param>
/// <param name="callbackUrl">URL for which we want to check the subscription status.</param>
/// <returns>Status of the subscription.</returns>
public string ComputeDataSubscriptionStatus(string deviceId, DeviceSubscriptionType subscriptionType, string callbackUrl)
{
// If the callback URL in storage does not match the one currently registered in the client, we can assume that the engine
// is still trying to synchronize this subscription (either it was just created or it's being initialized at startup).
if ((subscriptionType == DeviceSubscriptionType.DesiredProperties && _connectionManager.GetCurrentDesiredPropertyUpdateCallbackId(deviceId) != callbackUrl) ||
(subscriptionType == DeviceSubscriptionType.Methods && _connectionManager.GetCurrentMethodCallbackId(deviceId) != callbackUrl) ||
(subscriptionType == DeviceSubscriptionType.C2DMessages && _connectionManager.GetCurrentMessageCallbackId(deviceId) != callbackUrl))
{
return SubscriptionStatusStarting;
}
var deviceStatus = _connectionManager.GetDeviceStatus(deviceId);
if (deviceStatus?.status == ConnectionStatus.Connected)
{
// Device is connected and callback is registered, so the subscription is running.
return SubscriptionStatusRunning;
}
else if (deviceStatus?.status == ConnectionStatus.Disconnected || deviceStatus?.status == ConnectionStatus.Disabled)
{
// Callbacks match, but the device is disconnected and the SDK won't automatically retry, so the subscription is permanently stopped.
return SubscriptionStatusStopped;
}
else
{
// If the device is not explicitly connected or disconnected, we can assume that the SDK is retrying or a client is being created.
return SubscriptionStatusStarting;
}
}
/// <summary>
/// Starts the Initialization of data subscriptions for all devices based on the list fetched from the DB at service construction time.
/// For use during service startup.
/// </summary>
public async Task StartDataSubscriptionsInitializationAsync()
{
_logger.Info("Attempting to initialize subscriptions for all devices");
var deviceIds = dataSubscriptionsToInitialize.Keys.ToList();
// Synchronizes one batch of devices at a time. This avoids the creation of too many async tasks simultaneously.
for (int i = 0; i < deviceIds.Count; ++i)
{
var deviceId = deviceIds[i];
var _ = SynchronizeDeviceDbAndEngineDataSubscriptionsAsync(deviceId, true).ContinueWith(t => _logger.Error(t.Exception, "Failed to initialize DB subscriptions for device {deviceId}", deviceId), TaskContinuationOptions.OnlyOnFaulted);
if ((i + 1) % _connectionBatchSize == 0 && (i + 1) < deviceIds.Count)
{
_logger.Info("Waiting {subscriptionFullSyncBatchIntervalMs} ms before syncing subscriptions for next {subscriptionFullSyncBatchSize} devices", _connectionBatchIntervalMs, _connectionBatchSize);
await Task.Delay((int)_connectionBatchIntervalMs);
}
}
_logger.Info("Successfully initialized subscriptions from DB for {deviceCount} devices", deviceIds.Count);
}
/// <summary>
/// The scheduler starts a batch of scheduled connections in each interval.
/// </summary>
public async Task StartSubscriptionSchedulerAsync()
{
_logger.Info("Started subscription scheduler task");
while (true)
{
try
{
var currentTime = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
// Start any scheduled connections whose 'not before' timestamp has already expired (up to the maximum batch size).
int startedConnections = 0;
foreach (var entry in _scheduledConnectionsNotBefore)
{
if (currentTime >= entry.Value)
{
_scheduledConnectionsNotBefore.TryRemove(entry.Key, out long _);
var _ = AttemptDeviceConnection(entry.Key).ContinueWith(t => _logger.Error(t.Exception, "Failed to issue scheduled connection attempt for device {deviceId}", entry.Key), TaskContinuationOptions.OnlyOnFaulted);
startedConnections++;
}
if (startedConnections >= _connectionBatchSize)
{
break;
}
}
if (startedConnections > 0)
{
_logger.Info("Subscription scheduler started {connectionCount} connections.", startedConnections);
}
}
catch (Exception e)
{
_logger.Error(e, "Failed to execute subscription scheduler");
}
// Trigger the next execution
await Task.Delay((int)_connectionBatchIntervalMs);
}
}
/// <summary>
/// Triggers a synchronization of the internal state (connection and callbacks) for a device reflects the subscriptions status and callbacks stored in the DB or in the initialization list.
/// </summary>
/// <param name="deviceId">Id of the device to synchronize subscriptions for.</param>
/// <param name="useInitializationList">Whether subscriptions should be pulled from the initialization list or fetched from the DB.</param>
public async Task SynchronizeDeviceDbAndEngineDataSubscriptionsAsync(string deviceId, bool useInitializationList = false)
{
_logger.Info("Attempting to synchronize DB data subscriptions and internal state for device {deviceId}", deviceId);
// Synchronizing this code is the only way to guarantee that the current state of the runner will always reflect the latest state in the DB.
// Otherwise we could end up in an inconsistent state if a subscription is deleted and recreated too fast or if a subscription is modified
// while we're initializing the device with data fetched from the DB on startup.
var mutex = _dbAndConnectionStateSyncSemaphores.GetOrAdd(deviceId, new SemaphoreSlim(1, 1));
await mutex.WaitAsync();
try
{
_logger.Info("Acquired DB and connection state sync lock for device {deviceId}", deviceId);
List<DeviceSubscription> dataSubscriptions;
if (useInitializationList)
{
if (!dataSubscriptionsToInitialize.TryGetValue(deviceId, out dataSubscriptions))
{
_logger.Info("Subscriptions for Device {deviceId} not found in initialization list", deviceId);
return;
}
}
else
{
dataSubscriptions = (await _storageProvider.ListDeviceSubscriptions(_logger, deviceId)).FindAll(s => s.SubscriptionType.IsDataSubscription());
}
// Remove the device form the initialization list to mark it as already initialized.
dataSubscriptionsToInitialize.TryRemove(deviceId, out _);
// If a desired property subscription exists, register the callback. If not, ensure that the callback doesn't exist.
var desiredPropertySubscription = dataSubscriptions.Find(s => s.SubscriptionType == DeviceSubscriptionType.DesiredProperties);
if (desiredPropertySubscription != null)
{
await _connectionManager.SetDesiredPropertyUpdateCallbackAsync(deviceId, desiredPropertySubscription.CallbackUrl, _subscriptionCallbackFactory.GetDesiredPropertyUpdateCallback(deviceId, desiredPropertySubscription));
}
else
{
await _connectionManager.RemoveDesiredPropertyUpdateCallbackAsync(deviceId);
}
// If a method subscription exists, register the callback. If not, ensure that the callback doesn't exist.
var methodSubscription = dataSubscriptions.Find(s => s.SubscriptionType == DeviceSubscriptionType.Methods);
if (methodSubscription != null)
{
await _connectionManager.SetMethodCallbackAsync(deviceId, methodSubscription.CallbackUrl, _subscriptionCallbackFactory.GetMethodCallback(deviceId, methodSubscription));
}
else
{
await _connectionManager.RemoveMethodCallbackAsync(deviceId);
}
// If a C2D subscription exists, register the callback. If not, ensure that the callback doesn't exist.
var messageSubscription = dataSubscriptions.Find(s => s.SubscriptionType == DeviceSubscriptionType.C2DMessages);
if (messageSubscription != null)
{
await _connectionManager.SetMessageCallbackAsync(deviceId, messageSubscription.CallbackUrl, _subscriptionCallbackFactory.GetReceiveC2DMessageCallback(deviceId, messageSubscription));
}
else
{
await _connectionManager.RemoveMessageCallbackAsync(deviceId);
}
// The device needs a connection constantly open if at least one data subscription exists. If not, the connection can be closed.
if (dataSubscriptions.Count > 0)
{
_hasDataSubscriptions.TryAdd(deviceId, 0 /* placeholder */);
// Schedule a connection to be opened as soon as possible.
var notBefore = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
_scheduledConnectionsNotBefore.AddOrUpdate(deviceId, notBefore, (key, oldValue) => notBefore);
}
else
{
_hasDataSubscriptions.TryRemove(deviceId, out _);
// Remove any scheduled connections and close the connection right away.
_scheduledConnectionsNotBefore.TryRemove(deviceId, out _);
await _connectionManager.AssertDeviceConnectionClosedAsync(deviceId);
}
}
finally
{
mutex.Release();
}
}
private async Task AttemptDeviceConnection(string deviceId)
{
_logger.Info("Starting scheduled connection attempt for device {deviceId}.", deviceId);
// Synchronization is needed to avoid the race condition of a device connection being closed between the time we check
// if a connection should be open (the device has subscriptions) and the connection open call being actually issued.
var mutex = _dbAndConnectionStateSyncSemaphores.GetOrAdd(deviceId, new SemaphoreSlim(1, 1));
await mutex.WaitAsync();
try
{
if (!_hasDataSubscriptions.TryGetValue(deviceId, out _))
{
_logger.Info("Skipping scheduled connection attempt for device {deviceId} as it no longer has data subscriptions.", deviceId);
return;
}
await _connectionManager.AssertDeviceConnectionOpenAsync(deviceId, false /* permanent */);
_consecutiveConnectionFailures.TryRemove(deviceId, out int _);
_logger.Info("Successfully opened scheduled connection for device {deviceId}.", deviceId);
}
catch (Exception e)
{
int failedAttempts;
if (_consecutiveConnectionFailures.TryGetValue(deviceId, out failedAttempts))
{
failedAttempts++;
}
else
{
failedAttempts = 1;
}
_consecutiveConnectionFailures.AddOrUpdate(deviceId, failedAttempts, (key, oldValue) => failedAttempts);
// A failed connection attempt already includes the regular device client retries and a DPS registration attempt,
// so we back off after each failed attempt, up to 30 minutes.
var backoff = new Random().Next(0, Math.Min(ConnectionBackoffPerFailedAttemptSeconds * failedAttempts, MaxConnectionBackoffSeconds));
_logger.Error(e, "Failed to open scheduled connection for device {deviceId}. {failedAttempts} consecutive failed attempts so far. Connection scheduled for retry after {backoff} seconds.", deviceId, failedAttempts, backoff);
var notBefore = DateTimeOffset.UtcNow.ToUnixTimeSeconds() + backoff;
_scheduledConnectionsNotBefore.AddOrUpdate(deviceId, notBefore, (key, oldValue) => notBefore);
}
finally
{
mutex.Release();
}
}
/// <summary>
/// Schedule retries on connection drops.
/// </summary>
private async Task GetRetryGlobalConnectionStatusChangeCallback(string deviceId, ConnectionStatus status, ConnectionStatusChangeReason reason)
{
// Permanent failure state, taken from https://github.com/Azure-Samples/azure-iot-samples-csharp/tree/master/iot-hub/Samples/device/DeviceReconnectionSample
bool isFailed = status == ConnectionStatus.Disconnected;
// Don't retry if it's not a permanent failure.
if (!isFailed)
{
_logger.Info("Ignoring global connection status change for device {deviceId} as status is not failed. Status: {status}. Reason: {reason}", deviceId, status, reason);
return;
}
// Only retry if the device requires a permanent connection.
if (!_hasDataSubscriptions.TryGetValue(deviceId, out _))
{
_logger.Info("Ignoring global connection status change for device {deviceId} as it does not have data subscriptions. Status: {status}. Reason: {reason}", deviceId, status, reason);
return;
}
// Synchronize this block so we don't reschedule a connection retry while a connection attempt is ongoing.
var mutex = _dbAndConnectionStateSyncSemaphores.GetOrAdd(deviceId, new SemaphoreSlim(1, 1));
await mutex.WaitAsync();
try
{
var deviceStatus = _connectionManager.GetDeviceStatus(deviceId);
// The connection status might have changed while we were waiting for the mutex.
if (deviceStatus?.status != ConnectionStatus.Disconnected)
{
_logger.Info("Skipping retry attempt for device {deviceId} as status is no longer failed.", deviceId);
return;
}
// If a connection is already scheduled, don't schedule again.
if (_scheduledConnectionsNotBefore.TryGetValue(deviceId, out _))
{
_logger.Info("Skipping retry attempt for device {deviceId} as it already has a scheduled connection.", deviceId);
return;
}
// A permanent failure means that the SDK has retried for a while, so reschedule a connection attempt as soon as possible.
_logger.Info("Scheduling connection retry for device {deviceId}.", deviceId);
var notBefore = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
_scheduledConnectionsNotBefore.AddOrUpdate(deviceId, notBefore, (key, oldValue) => notBefore);
}
finally
{
mutex.Release();
}
}
}
}