in DeviceBridgeTests/Services/SubscriptionSchedulerTests.cs [33:113]
public async Task SubscriptionStartupInitializationFromDB()
{
using (ShimsContext.Create())
{
_connectionManagerMock.Invocations.Clear();
_storageProviderMock.Invocations.Clear();
// Return subscriptions for 4 different devices, with different combinations of status and data subscriptions.
_storageProviderMock.Setup(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>())).Returns(Task.FromResult(new List<DeviceSubscription>() {
TestUtils.GetTestSubscription("test-device-1", DeviceSubscriptionType.C2DMessages),
TestUtils.GetTestSubscription("test-device-1", DeviceSubscriptionType.ConnectionStatus),
TestUtils.GetTestSubscription("test-device-2", DeviceSubscriptionType.ConnectionStatus),
TestUtils.GetTestSubscription("test-device-3", DeviceSubscriptionType.Methods),
TestUtils.GetTestSubscription("test-device-3", DeviceSubscriptionType.DesiredProperties),
TestUtils.GetTestSubscription("test-device-4", DeviceSubscriptionType.DesiredProperties),
TestUtils.GetTestSubscription("test-device-5", DeviceSubscriptionType.DesiredProperties),
}));
// Check that status change subscription sends correct payload to callback URL.
_httpClientFactoryMock.Setup(p => p.CreateClient("RetryClient")).Returns(new System.Net.Http.Fakes.ShimHttpClient().Instance);
System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) =>
{
Assert.AreEqual("http://abc", url);
var result = JsonConvert.DeserializeObject<ConnectionStatusChangeEventBody>(payload.ReadAsStringAsync().Result);
StringAssert.StartsWith("test-device-", result.DeviceId);
Assert.AreEqual(ConnectionStatus.Connected.ToString(), result.Status);
Assert.AreEqual(ConnectionStatusChangeReason.Connection_Ok.ToString(), result.Reason);
return Task.FromResult(new System.Net.Http.Fakes.ShimHttpResponseMessage().Instance);
};
// Trigger status change as soon as callback is registered.
_connectionManagerMock.Setup(p => p.SetConnectionStatusCallback(It.IsAny<string>(), It.IsAny<Func<ConnectionStatus, ConnectionStatusChangeReason, Task>>())).Callback<string, Func<ConnectionStatus, ConnectionStatusChangeReason, Task>>((deviceId, callback) =>
callback(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok));
var subscriptionCallbackFactory = new SubscriptionCallbackFactory(LogManager.GetCurrentClassLogger(), _httpClientFactoryMock.Object);
var subscriptionScheduler = new SubscriptionScheduler(LogManager.GetCurrentClassLogger(), _connectionManagerMock.Object, _storageProviderMock.Object, subscriptionCallbackFactory, 2, 10);
// Check that status callback for both devices were registered.
_storageProviderMock.Verify(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>()), Times.Once);
_connectionManagerMock.Verify(p => p.SetConnectionStatusCallback("test-device-1", It.IsAny<Func<ConnectionStatus, ConnectionStatusChangeReason, Task>>()), Times.Once);
_connectionManagerMock.Verify(p => p.SetConnectionStatusCallback("test-device-2", It.IsAny<Func<ConnectionStatus, ConnectionStatusChangeReason, Task>>()), Times.Once);
// Capture all device initialization tasks.
var capturedTasks = new List<Task>();
System.Threading.Tasks.Fakes.ShimTask.AllInstances.ContinueWithActionOfTaskTaskContinuationOptions = (task, action, options) =>
{
capturedTasks.Add(task);
return ShimsContext.ExecuteWithoutShims(() => task.ContinueWith(action, options));
};
// Assert that Task.Delay has been called after two devices have been initialized (i.e., that we're initializing 2 devices at a time).
System.Threading.Tasks.Fakes.ShimTask.DelayInt32 = delay =>
{
Assert.AreEqual(10, delay);
Assert.AreEqual(2, capturedTasks.Count);
return Task.CompletedTask;
};
// Checks that explicitly calling resync on a device removes it from the initialization list (so we don't initialize the engine with stale data).
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), It.IsAny<string>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device-5");
// Wait for initialization of all devices to finish.
// The device that didn't have any data subscriptions and the one for which we called resync should not be initialized by this task.
await subscriptionScheduler.StartDataSubscriptionsInitializationAsync();
await Task.WhenAll(capturedTasks);
Assert.AreEqual(3, capturedTasks.Count);
// Check that callbacks were properly registred.
_connectionManagerMock.Verify(p => p.SetMessageCallbackAsync("test-device-1", "http://abc", It.IsAny<Func<Message, Task<ReceiveMessageCallbackStatus>>>()), Times.Once);
_connectionManagerMock.Verify(p => p.SetMethodCallbackAsync("test-device-3", "http://abc", It.IsAny<MethodCallback>()), Times.Once);
_connectionManagerMock.Verify(p => p.SetDesiredPropertyUpdateCallbackAsync("test-device-3", "http://abc", It.IsAny<DesiredPropertyUpdateCallback>()), Times.Once);
// Devices 1, 3, and 4 have data subscriptions, so their connections should be open.
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 2, 10);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 1, 10);
_connectionManagerMock.Verify(p => p.AssertDeviceConnectionOpenAsync("test-device-1", false, null), Times.Once);
_connectionManagerMock.Verify(p => p.AssertDeviceConnectionOpenAsync("test-device-4", false, null), Times.Once);
_connectionManagerMock.Verify(p => p.AssertDeviceConnectionOpenAsync("test-device-3", false, null), Times.Once);
}
}