DeviceBridgeTests/Services/SubscriptionSchedulerTests.cs (408 lines of code) (raw):
// Copyright (c) Microsoft Corporation. All rights reserved.
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using DeviceBridge.Models;
using DeviceBridge.Providers;
using DeviceBridgeTests.Common;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Shared;
using Microsoft.QualityTools.Testing.Fakes;
using Moq;
using Newtonsoft.Json;
using NLog;
using NUnit.Framework;
namespace DeviceBridge.Services.Tests
{
[TestFixture]
public class SubscriptionSchedulerTests
{
private Mock<IStorageProvider> _storageProviderMock = new Mock<IStorageProvider>();
private Mock<IHttpClientFactory> _httpClientFactoryMock = new Mock<IHttpClientFactory>();
private Mock<ISubscriptionCallbackFactory> _subscriptionCallbackFactoryMock = new Mock<ISubscriptionCallbackFactory>();
private Mock<IConnectionManager> _connectionManagerMock = new Mock<IConnectionManager>();
[Test]
[Description("Verifies that the constructor fetches and initializes all subscriptions from the DB")]
public async Task SubscriptionStartupInitializationFromDB()
{
using (ShimsContext.Create())
{
_connectionManagerMock.Invocations.Clear();
_storageProviderMock.Invocations.Clear();
// Return subscriptions for 4 different devices, with different combinations of status and data subscriptions.
_storageProviderMock.Setup(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>())).Returns(Task.FromResult(new List<DeviceSubscription>() {
TestUtils.GetTestSubscription("test-device-1", DeviceSubscriptionType.C2DMessages),
TestUtils.GetTestSubscription("test-device-1", DeviceSubscriptionType.ConnectionStatus),
TestUtils.GetTestSubscription("test-device-2", DeviceSubscriptionType.ConnectionStatus),
TestUtils.GetTestSubscription("test-device-3", DeviceSubscriptionType.Methods),
TestUtils.GetTestSubscription("test-device-3", DeviceSubscriptionType.DesiredProperties),
TestUtils.GetTestSubscription("test-device-4", DeviceSubscriptionType.DesiredProperties),
TestUtils.GetTestSubscription("test-device-5", DeviceSubscriptionType.DesiredProperties),
}));
// Check that status change subscription sends correct payload to callback URL.
_httpClientFactoryMock.Setup(p => p.CreateClient("RetryClient")).Returns(new System.Net.Http.Fakes.ShimHttpClient().Instance);
System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) =>
{
Assert.AreEqual("http://abc", url);
var result = JsonConvert.DeserializeObject<ConnectionStatusChangeEventBody>(payload.ReadAsStringAsync().Result);
StringAssert.StartsWith("test-device-", result.DeviceId);
Assert.AreEqual(ConnectionStatus.Connected.ToString(), result.Status);
Assert.AreEqual(ConnectionStatusChangeReason.Connection_Ok.ToString(), result.Reason);
return Task.FromResult(new System.Net.Http.Fakes.ShimHttpResponseMessage().Instance);
};
// Trigger status change as soon as callback is registered.
_connectionManagerMock.Setup(p => p.SetConnectionStatusCallback(It.IsAny<string>(), It.IsAny<Func<ConnectionStatus, ConnectionStatusChangeReason, Task>>())).Callback<string, Func<ConnectionStatus, ConnectionStatusChangeReason, Task>>((deviceId, callback) =>
callback(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok));
var subscriptionCallbackFactory = new SubscriptionCallbackFactory(LogManager.GetCurrentClassLogger(), _httpClientFactoryMock.Object);
var subscriptionScheduler = new SubscriptionScheduler(LogManager.GetCurrentClassLogger(), _connectionManagerMock.Object, _storageProviderMock.Object, subscriptionCallbackFactory, 2, 10);
// Check that status callback for both devices were registered.
_storageProviderMock.Verify(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>()), Times.Once);
_connectionManagerMock.Verify(p => p.SetConnectionStatusCallback("test-device-1", It.IsAny<Func<ConnectionStatus, ConnectionStatusChangeReason, Task>>()), Times.Once);
_connectionManagerMock.Verify(p => p.SetConnectionStatusCallback("test-device-2", It.IsAny<Func<ConnectionStatus, ConnectionStatusChangeReason, Task>>()), Times.Once);
// Capture all device initialization tasks.
var capturedTasks = new List<Task>();
System.Threading.Tasks.Fakes.ShimTask.AllInstances.ContinueWithActionOfTaskTaskContinuationOptions = (task, action, options) =>
{
capturedTasks.Add(task);
return ShimsContext.ExecuteWithoutShims(() => task.ContinueWith(action, options));
};
// Assert that Task.Delay has been called after two devices have been initialized (i.e., that we're initializing 2 devices at a time).
System.Threading.Tasks.Fakes.ShimTask.DelayInt32 = delay =>
{
Assert.AreEqual(10, delay);
Assert.AreEqual(2, capturedTasks.Count);
return Task.CompletedTask;
};
// Checks that explicitly calling resync on a device removes it from the initialization list (so we don't initialize the engine with stale data).
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), It.IsAny<string>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device-5");
// Wait for initialization of all devices to finish.
// The device that didn't have any data subscriptions and the one for which we called resync should not be initialized by this task.
await subscriptionScheduler.StartDataSubscriptionsInitializationAsync();
await Task.WhenAll(capturedTasks);
Assert.AreEqual(3, capturedTasks.Count);
// Check that callbacks were properly registred.
_connectionManagerMock.Verify(p => p.SetMessageCallbackAsync("test-device-1", "http://abc", It.IsAny<Func<Message, Task<ReceiveMessageCallbackStatus>>>()), Times.Once);
_connectionManagerMock.Verify(p => p.SetMethodCallbackAsync("test-device-3", "http://abc", It.IsAny<MethodCallback>()), Times.Once);
_connectionManagerMock.Verify(p => p.SetDesiredPropertyUpdateCallbackAsync("test-device-3", "http://abc", It.IsAny<DesiredPropertyUpdateCallback>()), Times.Once);
// Devices 1, 3, and 4 have data subscriptions, so their connections should be open.
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 2, 10);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 1, 10);
_connectionManagerMock.Verify(p => p.AssertDeviceConnectionOpenAsync("test-device-1", false, null), Times.Once);
_connectionManagerMock.Verify(p => p.AssertDeviceConnectionOpenAsync("test-device-4", false, null), Times.Once);
_connectionManagerMock.Verify(p => p.AssertDeviceConnectionOpenAsync("test-device-3", false, null), Times.Once);
}
}
[Test]
[Description("Checks that data subscriptions sync properly registers callbacks and they behave as expected")]
public async Task DataSubscriptionsSyncAndCallbackBehavior()
{
using (ShimsContext.Create())
{
_storageProviderMock.Invocations.Clear();
_connectionManagerMock.Invocations.Clear();
// Capture all registered callbacks for verification.
Func<Message, Task<ReceiveMessageCallbackStatus>> messageCallback = null;
MethodCallback methodCallback = null;
DesiredPropertyUpdateCallback propertyCallback = null;
_connectionManagerMock.Setup(p => p.SetMessageCallbackAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<Func<Message, Task<ReceiveMessageCallbackStatus>>>()))
.Callback<string, string, Func<Message, Task<ReceiveMessageCallbackStatus>>>((_, __, callback) => messageCallback = callback);
_connectionManagerMock.Setup(p => p.SetMethodCallbackAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<MethodCallback>()))
.Callback<string, string, MethodCallback>((_, __, callback) => methodCallback = callback);
_connectionManagerMock.Setup(p => p.SetDesiredPropertyUpdateCallbackAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<DesiredPropertyUpdateCallback>()))
.Callback<string, string, DesiredPropertyUpdateCallback>((_, __, callback) => propertyCallback = callback);
// If subscriptions are returned form the DB, callbacks are registered and a connection is scheduled
_storageProviderMock.Setup(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
var c2dSub = TestUtils.GetTestSubscription("test-device-id", DeviceSubscriptionType.C2DMessages);
var methodSub = TestUtils.GetTestSubscription("test-device-id", DeviceSubscriptionType.Methods);
var propertySub = TestUtils.GetTestSubscription("test-device-id", DeviceSubscriptionType.DesiredProperties);
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), "test-device-id")).Returns(Task.FromResult(new List<DeviceSubscription>() { c2dSub, methodSub, propertySub }));
var subscriptionCallbackFactory = new SubscriptionCallbackFactory(LogManager.GetCurrentClassLogger(), _httpClientFactoryMock.Object);
var subscriptionScheduler = new SubscriptionScheduler(LogManager.GetCurrentClassLogger(), _connectionManagerMock.Object, _storageProviderMock.Object, subscriptionCallbackFactory, 2, 10);
SemaphoreSlim createSemaphore = null;
TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => createSemaphore = capturedSemaphore);
await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device-id", false);
_connectionManagerMock.Verify(p => p.SetMessageCallbackAsync("test-device-id", "http://abc", It.IsAny<Func<Message, Task<ReceiveMessageCallbackStatus>>>()), Times.Once);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler);
_connectionManagerMock.Verify(p => p.AssertDeviceConnectionOpenAsync("test-device-id", false, null), Times.Once);
// If DB returns no subscriptions, callbacks are unregistered and connection is closed.
_connectionManagerMock.Invocations.Clear();
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), "test-device-id")).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
SemaphoreSlim deleteSemaphore = null;
TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => deleteSemaphore = capturedSemaphore);
await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device-id", false);
_connectionManagerMock.Verify(p => p.RemoveMessageCallbackAsync("test-device-id"), Times.Once);
_connectionManagerMock.Verify(p => p.RemoveMethodCallbackAsync("test-device-id"), Times.Once);
_connectionManagerMock.Verify(p => p.RemoveDesiredPropertyUpdateCallbackAsync("test-device-id"), Times.Once);
_connectionManagerMock.Verify(p => p.AssertDeviceConnectionClosedAsync("test-device-id", false), Times.Once);
// Create and delete lock on the same semaphore.
Assert.AreEqual(createSemaphore, deleteSemaphore);
// Operations on another device lock over a different semaphore.
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), "another-device-id")).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
SemaphoreSlim anotherDeviceSemaphore = null;
TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => anotherDeviceSemaphore = capturedSemaphore);
await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("another-device-id", false);
Assert.AreNotEqual(anotherDeviceSemaphore, deleteSemaphore);
// Checks that C2D message callback sends proper body and accepts the message on success.
_httpClientFactoryMock.Setup(p => p.CreateClient("RetryClient")).Returns(new System.Net.Http.Fakes.ShimHttpClient().Instance);
System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) =>
{
Assert.AreEqual("http://abc", url);
var result = JsonConvert.DeserializeObject<C2DMessageInvocationEventBody>(payload.ReadAsStringAsync().Result);
Assert.AreEqual("test-device-id", result.DeviceId);
Assert.AreEqual("{\"tel\":1}", result.MessageBody.Value);
return Task.FromResult(new HttpResponseMessage()
{
StatusCode = (HttpStatusCode)200,
});
};
Message testMsg = new Message(Encoding.UTF8.GetBytes("{\"tel\": 1}"));
var callbackResult = await messageCallback(testMsg);
Assert.AreEqual(ReceiveMessageCallbackStatus.Accept, callbackResult);
// Checks that message callback rejects the message on a 4xx errors.
System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) => Task.FromResult(new HttpResponseMessage() { StatusCode = (HttpStatusCode)401, });
callbackResult = await messageCallback(new Message(Encoding.UTF8.GetBytes("{\"tel\": 1}")));
Assert.AreEqual(ReceiveMessageCallbackStatus.Reject, callbackResult);
// Checks that message callback abandons the message on network errors.
System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) => throw new Exception();
callbackResult = await messageCallback(new Message(Encoding.UTF8.GetBytes("{\"tel\": 1}")));
Assert.AreEqual(ReceiveMessageCallbackStatus.Abandon, callbackResult);
// Check that method callback correctly extracts the response status from the callback payload.
System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) =>
{
Assert.AreEqual("http://abc", url);
var result = JsonConvert.DeserializeObject<MethodInvocationEventBody>(payload.ReadAsStringAsync().Result);
Assert.AreEqual("test-device-id", result.DeviceId);
Assert.AreEqual("tst-name", result.MethodName);
Assert.AreEqual("{\"tel\":1}", result.RequestData.Value);
return Task.FromResult(new HttpResponseMessage()
{
StatusCode = (HttpStatusCode)200,
Content = new StringContent("{\"status\": 200}", Encoding.UTF8, "application/json"),
});
};
var methodCallbackResult = await methodCallback(new MethodRequest("tst-name", Encoding.UTF8.GetBytes("{\"tel\": 1}")), null);
Assert.AreEqual(200, methodCallbackResult.Status);
// Check that property update callback sends the correct data.
System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) =>
{
Assert.AreEqual("http://abc", url);
var result = JsonConvert.DeserializeObject<DesiredPropertyUpdateEventBody>(payload.ReadAsStringAsync().Result);
Assert.AreEqual("test-device-id", result.DeviceId);
Assert.AreEqual("{\"tel\":1}", result.DesiredProperties.Value);
return Task.FromResult(new HttpResponseMessage()
{
StatusCode = (HttpStatusCode)200,
});
};
await propertyCallback(new TwinCollection("{\"tel\": 1}"), null);
}
}
[Test]
[Description("Subscription scheduler only attempts up to _connectionBatchSize per scheduling interval")]
public async Task SubscriptionSchedulerRespectsConnectionBatchSize()
{
using (ShimsContext.Create())
{
// Return 3 devices with subscriptions, which the schedule will attempt to connect.
_storageProviderMock.Setup(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), "test-device-1")).Returns(Task.FromResult(new List<DeviceSubscription>() { TestUtils.GetTestSubscription("test-device-1", DeviceSubscriptionType.C2DMessages) }));
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), "test-device-2")).Returns(Task.FromResult(new List<DeviceSubscription>() { TestUtils.GetTestSubscription("test-device-1", DeviceSubscriptionType.C2DMessages) }));
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), "test-device-3")).Returns(Task.FromResult(new List<DeviceSubscription>() { TestUtils.GetTestSubscription("test-device-1", DeviceSubscriptionType.C2DMessages) }));
var subscriptionCallbackFactory = new SubscriptionCallbackFactory(LogManager.GetCurrentClassLogger(), _httpClientFactoryMock.Object);
var subscriptionScheduler = new SubscriptionScheduler(LogManager.GetCurrentClassLogger(), _connectionManagerMock.Object, _storageProviderMock.Object, subscriptionCallbackFactory, 2, 10);
await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device-1", false);
await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device-2", false);
await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device-3", false);
// We set a batch size of 2, so the scheduler should attempt 2 connections, then 1, then 0.
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 2, 10);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 1, 10);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 0, 10);
}
}
[Test]
[Description("Subscription scheduler doesn't attempt connection until notBefore has expired")]
public async Task SubscriptionSchedulerRespectsNotBefore()
{
using (ShimsContext.Create())
{
// Return 1 devices with subscription, which the schedule will attempt to connect.
_storageProviderMock.Setup(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), "test-device")).Returns(Task.FromResult(new List<DeviceSubscription>() { TestUtils.GetTestSubscription("test-device", DeviceSubscriptionType.C2DMessages) }));
var subscriptionCallbackFactory = new SubscriptionCallbackFactory(LogManager.GetCurrentClassLogger(), _httpClientFactoryMock.Object);
var subscriptionScheduler = new SubscriptionScheduler(LogManager.GetCurrentClassLogger(), _connectionManagerMock.Object, _storageProviderMock.Object, subscriptionCallbackFactory, 2, 10);
// Move clock so subscription will be scheduled in the future.
TestUtils.ShimUtcNowAhead(1);
await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device", false);
TestUtils.UnshimUtcNow();
// Check that connection is not attempted.
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 0, 10);
}
}
[Test]
[Description("AttemptDeviceConnection locks on the same semaphore as subscription sync")]
public async Task AttemptDeviceConnectionSemaphore()
{
using (ShimsContext.Create())
{
_storageProviderMock.Setup(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), "test-device")).Returns(Task.FromResult(new List<DeviceSubscription>() { TestUtils.GetTestSubscription("test-device", DeviceSubscriptionType.C2DMessages) }));
var subscriptionCallbackFactory = new SubscriptionCallbackFactory(LogManager.GetCurrentClassLogger(), _httpClientFactoryMock.Object);
var subscriptionScheduler = new SubscriptionScheduler(LogManager.GetCurrentClassLogger(), _connectionManagerMock.Object, _storageProviderMock.Object, subscriptionCallbackFactory, 2, 10);
SemaphoreSlim syncSemaphore = null;
TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => syncSemaphore = capturedSemaphore);
await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device", false);
SemaphoreSlim connectionAttemptSemaphore = null;
TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => connectionAttemptSemaphore = capturedSemaphore);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 1, 10);
Assert.AreEqual(syncSemaphore, connectionAttemptSemaphore);
}
}
[Test]
[Description("AttemptDeviceConnection reschedules connections 5, 10, 15, 20, 25, 30 min apart on connection failures")]
public async Task AttemptDeviceConnectionBackoff()
{
using (ShimsContext.Create())
{
// Make random(min, max) always return max so we always schedule the connection at the maximum offset.
System.Fakes.ShimRandom.AllInstances.NextInt32Int32 = (@this, min, max) => max;
// Make connection fail once so it will be rescheduled for 5 minutes.
_storageProviderMock.Setup(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), "test-device")).Returns(Task.FromResult(new List<DeviceSubscription>() { TestUtils.GetTestSubscription("test-device", DeviceSubscriptionType.C2DMessages) }));
var subscriptionCallbackFactory = new SubscriptionCallbackFactory(LogManager.GetCurrentClassLogger(), _httpClientFactoryMock.Object);
var subscriptionScheduler = new SubscriptionScheduler(LogManager.GetCurrentClassLogger(), _connectionManagerMock.Object, _storageProviderMock.Object, subscriptionCallbackFactory, 2, 10);
await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device", false);
_connectionManagerMock.Setup(p => p.AssertDeviceConnectionOpenAsync("test-device", false, null)).Returns(Task.FromException(new Exception("Open failed")));
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 1, 10);
// Attempt connections forwarding the clock 4 and 5 minutes, to make sure it only attempts to connect after 5 minutes.
TestUtils.ShimUtcNowAheadOnceAndRevert(4);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 0, 10);
TestUtils.ShimUtcNowAheadOnceAndRevert(5);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 1, 10);
// Second failure should back off 10 minutes.
TestUtils.ShimUtcNowAheadOnceAndRevert(9);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 0, 10);
TestUtils.ShimUtcNowAheadOnceAndRevert(10);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 1, 10);
// Third failure should back off 15 minutes.
TestUtils.ShimUtcNowAheadOnceAndRevert(14);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 0, 10);
TestUtils.ShimUtcNowAheadOnceAndRevert(15);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 1, 10);
// Fourth failure should back off 20 minutes.
TestUtils.ShimUtcNowAheadOnceAndRevert(19);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 0, 10);
TestUtils.ShimUtcNowAheadOnceAndRevert(20);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 1, 10);
// Fifth failure should back off 25 minutes.
TestUtils.ShimUtcNowAheadOnceAndRevert(24);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 0, 10);
TestUtils.ShimUtcNowAheadOnceAndRevert(25);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 1, 10);
// Sixth failure should back off 30 minutes.
TestUtils.ShimUtcNowAheadOnceAndRevert(29);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 0, 10);
TestUtils.ShimUtcNowAheadOnceAndRevert(30);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 1, 10);
// All subsequent failures should back off 30 minutes.
TestUtils.ShimUtcNowAheadOnceAndRevert(29);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 0, 10);
TestUtils.ShimUtcNowAheadOnceAndRevert(30);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 1, 10);
}
}
[Test]
[Description("AttemptDeviceConnection clears _consecutiveConnectionFailures on successful connection")]
public async Task AttemptDeviceConnectionClearsConsecutiveFailuresOnSuccess()
{
using (ShimsContext.Create())
{
// Make random(min, max) always return max so we always schedule the connection at the maximum offset.
System.Fakes.ShimRandom.AllInstances.NextInt32Int32 = (@this, min, max) => max;
// Make connection fail once so it will be rescheduled for 5 minutes.
_storageProviderMock.Setup(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), "test-device")).Returns(Task.FromResult(new List<DeviceSubscription>() { TestUtils.GetTestSubscription("test-device", DeviceSubscriptionType.C2DMessages) }));
var subscriptionCallbackFactory = new SubscriptionCallbackFactory(LogManager.GetCurrentClassLogger(), _httpClientFactoryMock.Object);
var subscriptionScheduler = new SubscriptionScheduler(LogManager.GetCurrentClassLogger(), _connectionManagerMock.Object, _storageProviderMock.Object, subscriptionCallbackFactory, 2, 10);
await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device", false);
_connectionManagerMock.Setup(p => p.AssertDeviceConnectionOpenAsync("test-device", false, null)).Returns(Task.FromException(new Exception("Open failed")));
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 1, 10);
// Attempt connections forwarding the clock 4 and 5 minutes, to make sure it only attempts to connect after 5 minutes (making the connection succeed).
TestUtils.ShimUtcNowAhead(4);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 0, 10);
_connectionManagerMock.Setup(p => p.AssertDeviceConnectionOpenAsync("test-device", false, null)).Returns(Task.CompletedTask);
TestUtils.ShimUtcNowAhead(5);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 1, 10);
// Make the connection fail again so it's rescheduled.
TestUtils.UnshimUtcNow();
await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device", false);
_connectionManagerMock.Setup(p => p.AssertDeviceConnectionOpenAsync("test-device", false, null)).Returns(Task.FromException(new Exception("Open failed")));
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 1, 10);
// Check that the connection was rescheduled for 5 minutes and not 10, as the previous successful connection cleared the consecutive failed attempt count.
TestUtils.ShimUtcNowAhead(4);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 0, 10);
TestUtils.ShimUtcNowAhead(5);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 1, 10);
}
}
[Test]
[Description("Connection retry on connection status changes/failures")]
public async Task GlobalConnectionStatusCallback()
{
using (ShimsContext.Create())
{
// Capture the status callback once it's registered.
Func<string, ConnectionStatus, ConnectionStatusChangeReason, Task> globalStatusChangeCallback = null;
_connectionManagerMock.Setup(p => p.SetGlobalConnectionStatusCallback(It.IsAny<Func<string, ConnectionStatus, ConnectionStatusChangeReason, Task>>()))
.Callback<Func<string, ConnectionStatus, ConnectionStatusChangeReason, Task>>(callback => globalStatusChangeCallback = callback);
_storageProviderMock.Setup(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
var subscriptionCallbackFactory = new SubscriptionCallbackFactory(LogManager.GetCurrentClassLogger(), _httpClientFactoryMock.Object);
var subscriptionScheduler = new SubscriptionScheduler(LogManager.GetCurrentClassLogger(), _connectionManagerMock.Object, _storageProviderMock.Object, subscriptionCallbackFactory, 2, 10);
// Check that the status change callback was registered.
Assert.NotNull(globalStatusChangeCallback);
// Check that the callback does nothing if the device doesn't have a data subscription.
SemaphoreSlim statusChangeSemaphore = null;
TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => statusChangeSemaphore = capturedSemaphore);
await globalStatusChangeCallback("test-device", ConnectionStatus.Disconnected, ConnectionStatusChangeReason.Retry_Expired);
Assert.IsNull(statusChangeSemaphore);
// Check that the callback does nothing if the device has a data subscription but the state is not failed.
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), "test-device")).Returns(Task.FromResult(new List<DeviceSubscription>() { TestUtils.GetTestSubscription("test-device", DeviceSubscriptionType.C2DMessages) }));
await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device", false);
statusChangeSemaphore = null;
await globalStatusChangeCallback("test-device", ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok);
Assert.IsNull(statusChangeSemaphore);
// Clear the connection that was scheduled by the sync.
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 1, 10);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 0, 10);
// GetRetryGlobalConnectionStatusChangeCallback locks on the same semaphore as sync.
SemaphoreSlim syncSemaphore = null;
TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => syncSemaphore = capturedSemaphore);
await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device", false);
statusChangeSemaphore = null;
TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => statusChangeSemaphore = capturedSemaphore);
await globalStatusChangeCallback("test-device", ConnectionStatus.Disconnected, ConnectionStatusChangeReason.Retry_Expired);
Assert.AreEqual(statusChangeSemaphore, syncSemaphore);
// Check that the sync scheduled a connection right away.
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 1, 10);
}
}
[Test]
[Description("Checks that the status of a data subscription is correctly computed from the current device client status")]
public async Task DataSubscriptionStatus()
{
using (ShimsContext.Create())
{
_storageProviderMock.Setup(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
_connectionManagerMock.Setup(p => p.GetDeviceStatus(It.IsAny<string>())).Returns((ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok));
// If the registered callbacks don't match the desired ones, the subscription is still starting.
_connectionManagerMock.Setup(p => p.GetCurrentMessageCallbackId(It.IsAny<string>())).Returns("http://another-callback-url");
_connectionManagerMock.Setup(p => p.GetCurrentMethodCallbackId(It.IsAny<string>())).Returns("http://another-callback-url");
_connectionManagerMock.Setup(p => p.GetCurrentDesiredPropertyUpdateCallbackId(It.IsAny<string>())).Returns("http://another-callback-url");
var subscriptionScheduler = new SubscriptionScheduler(LogManager.GetCurrentClassLogger(), _connectionManagerMock.Object, _storageProviderMock.Object, _subscriptionCallbackFactoryMock.Object, 2, 10);
var result = subscriptionScheduler.ComputeDataSubscriptionStatus("test-device-id", DeviceSubscriptionType.C2DMessages, "http://abc");
Assert.AreEqual("Starting", result);
result = subscriptionScheduler.ComputeDataSubscriptionStatus("test-device-id", DeviceSubscriptionType.Methods, "http://abc");
Assert.AreEqual("Starting", result);
result = subscriptionScheduler.ComputeDataSubscriptionStatus("test-device-id", DeviceSubscriptionType.DesiredProperties, "http://abc");
Assert.AreEqual("Starting", result);
// If the callback matches and the device is connected, the subscription is running.
_connectionManagerMock.Setup(p => p.GetCurrentDesiredPropertyUpdateCallbackId(It.IsAny<string>())).Returns("http://abc");
_connectionManagerMock.Setup(p => p.GetDeviceStatus(It.IsAny<string>())).Returns((ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok));
result = subscriptionScheduler.ComputeDataSubscriptionStatus("test-device-id", DeviceSubscriptionType.DesiredProperties, "http://abc");
Assert.AreEqual("Running", result);
// If the device is connected, the subscription is stopped.
_connectionManagerMock.Setup(p => p.GetDeviceStatus(It.IsAny<string>())).Returns((ConnectionStatus.Disconnected, ConnectionStatusChangeReason.Retry_Expired));
result = subscriptionScheduler.ComputeDataSubscriptionStatus("test-device-id", DeviceSubscriptionType.DesiredProperties, "http://abc");
Assert.AreEqual("Stopped", result);
// If the device is not connected or disconnected, the subscription is starting.
_connectionManagerMock.Setup(p => p.GetDeviceStatus(It.IsAny<string>())).Returns((ConnectionStatus.Disconnected_Retrying, ConnectionStatusChangeReason.Communication_Error));
result = subscriptionScheduler.ComputeDataSubscriptionStatus("test-device-id", DeviceSubscriptionType.DesiredProperties, "http://abc");
Assert.AreEqual("Starting", result);
}
}
/// <summary>
/// Runs the connection scheduler once and wait for all connection attempts to finish.
/// </summary>
/// <remarks>Must be used within a ShimsContext.</remarks>
private static async Task RunSchedulerOnceAndWaitConnectionAttempts(SubscriptionScheduler subscriptionScheduler, int? taskCountToAssert = null, int? delayToAssert = null)
{
var connectionAttempTasks = new List<Task>();
System.Threading.Tasks.Fakes.ShimTask.AllInstances.ContinueWithActionOfTaskTaskContinuationOptions = (task, action, options) =>
{
connectionAttempTasks.Add(task);
return ShimsContext.ExecuteWithoutShims(() => task.ContinueWith(action, options));
};
// Stop the subscription scheduler at the first call to 'Delay' and capture the connection attempt tasks initialized by the scheduler.
System.Threading.Tasks.Fakes.ShimTask.DelayInt32 = delay =>
{
if (delayToAssert.HasValue)
{
Assert.AreEqual(delayToAssert.Value, delay);
}
if (taskCountToAssert.HasValue)
{
Assert.AreEqual(taskCountToAssert.Value, connectionAttempTasks.Count);
}
return Task.FromException(new Exception("Cancelled at delay"));
};
try
{
await subscriptionScheduler.StartSubscriptionSchedulerAsync();
throw new AssertionException("Expected StartSubscriptionSchedulerAsync to fail");
}
catch (Exception e)
{
Assert.AreEqual("Cancelled at delay", e.Message);
}
await Task.WhenAll(connectionAttempTasks);
}
}
}