public async Task SubscriptionStartupInitializationFromDB()

in DeviceBridgeTests/Services/SubscriptionSchedulerTests.cs [33:113]


        public async Task SubscriptionStartupInitializationFromDB()
        {
            using (ShimsContext.Create())
            {
                _connectionManagerMock.Invocations.Clear();
                _storageProviderMock.Invocations.Clear();

                // Return subscriptions for 4 different devices, with different combinations of status and data subscriptions.
                _storageProviderMock.Setup(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>())).Returns(Task.FromResult(new List<DeviceSubscription>() {
                    TestUtils.GetTestSubscription("test-device-1", DeviceSubscriptionType.C2DMessages),
                    TestUtils.GetTestSubscription("test-device-1", DeviceSubscriptionType.ConnectionStatus),
                    TestUtils.GetTestSubscription("test-device-2", DeviceSubscriptionType.ConnectionStatus),
                    TestUtils.GetTestSubscription("test-device-3", DeviceSubscriptionType.Methods),
                    TestUtils.GetTestSubscription("test-device-3", DeviceSubscriptionType.DesiredProperties),
                    TestUtils.GetTestSubscription("test-device-4", DeviceSubscriptionType.DesiredProperties),
                    TestUtils.GetTestSubscription("test-device-5", DeviceSubscriptionType.DesiredProperties),
                }));

                // Check that status change subscription sends correct payload to callback URL.
                _httpClientFactoryMock.Setup(p => p.CreateClient("RetryClient")).Returns(new System.Net.Http.Fakes.ShimHttpClient().Instance);
                System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) =>
                {
                    Assert.AreEqual("http://abc", url);
                    var result = JsonConvert.DeserializeObject<ConnectionStatusChangeEventBody>(payload.ReadAsStringAsync().Result);
                    StringAssert.StartsWith("test-device-", result.DeviceId);
                    Assert.AreEqual(ConnectionStatus.Connected.ToString(), result.Status);
                    Assert.AreEqual(ConnectionStatusChangeReason.Connection_Ok.ToString(), result.Reason);
                    return Task.FromResult(new System.Net.Http.Fakes.ShimHttpResponseMessage().Instance);
                };

                // Trigger status change as soon as callback is registered.
                _connectionManagerMock.Setup(p => p.SetConnectionStatusCallback(It.IsAny<string>(), It.IsAny<Func<ConnectionStatus, ConnectionStatusChangeReason, Task>>())).Callback<string, Func<ConnectionStatus, ConnectionStatusChangeReason, Task>>((deviceId, callback) =>
                    callback(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok));

                var subscriptionCallbackFactory = new SubscriptionCallbackFactory(LogManager.GetCurrentClassLogger(), _httpClientFactoryMock.Object);
                var subscriptionScheduler = new SubscriptionScheduler(LogManager.GetCurrentClassLogger(), _connectionManagerMock.Object, _storageProviderMock.Object, subscriptionCallbackFactory, 2, 10);

                // Check that status callback for both devices were registered.
                _storageProviderMock.Verify(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>()), Times.Once);
                _connectionManagerMock.Verify(p => p.SetConnectionStatusCallback("test-device-1", It.IsAny<Func<ConnectionStatus, ConnectionStatusChangeReason, Task>>()), Times.Once);
                _connectionManagerMock.Verify(p => p.SetConnectionStatusCallback("test-device-2", It.IsAny<Func<ConnectionStatus, ConnectionStatusChangeReason, Task>>()), Times.Once);

                // Capture all device initialization tasks.
                var capturedTasks = new List<Task>();
                System.Threading.Tasks.Fakes.ShimTask.AllInstances.ContinueWithActionOfTaskTaskContinuationOptions = (task, action, options) =>
                {
                    capturedTasks.Add(task);
                    return ShimsContext.ExecuteWithoutShims(() => task.ContinueWith(action, options));
                };

                // Assert that Task.Delay has been called after two devices have been initialized (i.e., that we're initializing 2 devices at a time).
                System.Threading.Tasks.Fakes.ShimTask.DelayInt32 = delay =>
                {
                    Assert.AreEqual(10, delay);
                    Assert.AreEqual(2, capturedTasks.Count);
                    return Task.CompletedTask;
                };

                // Checks that explicitly calling resync on a device removes it from the initialization list (so we don't initialize the engine with stale data).
                _storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), It.IsAny<string>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
                await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device-5");

                // Wait for initialization of all devices to finish.
                // The device that didn't have any data subscriptions and the one for which we called resync should not be initialized by this task.
                await subscriptionScheduler.StartDataSubscriptionsInitializationAsync();
                await Task.WhenAll(capturedTasks);
                Assert.AreEqual(3, capturedTasks.Count);

                // Check that callbacks were properly registred.
                _connectionManagerMock.Verify(p => p.SetMessageCallbackAsync("test-device-1", "http://abc", It.IsAny<Func<Message, Task<ReceiveMessageCallbackStatus>>>()), Times.Once);
                _connectionManagerMock.Verify(p => p.SetMethodCallbackAsync("test-device-3", "http://abc", It.IsAny<MethodCallback>()), Times.Once);
                _connectionManagerMock.Verify(p => p.SetDesiredPropertyUpdateCallbackAsync("test-device-3", "http://abc", It.IsAny<DesiredPropertyUpdateCallback>()), Times.Once);

                // Devices 1, 3, and 4 have data subscriptions, so their connections should be open.
                await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 2, 10);
                await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler, 1, 10);
                _connectionManagerMock.Verify(p => p.AssertDeviceConnectionOpenAsync("test-device-1", false, null), Times.Once);
                _connectionManagerMock.Verify(p => p.AssertDeviceConnectionOpenAsync("test-device-4", false, null), Times.Once);
                _connectionManagerMock.Verify(p => p.AssertDeviceConnectionOpenAsync("test-device-3", false, null), Times.Once);
            }
        }