public async Task DataSubscriptionsSyncAndCallbackBehavior()

in DeviceBridgeTests/Services/SubscriptionSchedulerTests.cs [117:232]


        public async Task DataSubscriptionsSyncAndCallbackBehavior()
        {
            using (ShimsContext.Create())
            {
                _storageProviderMock.Invocations.Clear();
                _connectionManagerMock.Invocations.Clear();

                // Capture all registered callbacks for verification.
                Func<Message, Task<ReceiveMessageCallbackStatus>> messageCallback = null;
                MethodCallback methodCallback = null;
                DesiredPropertyUpdateCallback propertyCallback = null;
                _connectionManagerMock.Setup(p => p.SetMessageCallbackAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<Func<Message, Task<ReceiveMessageCallbackStatus>>>()))
                    .Callback<string, string, Func<Message, Task<ReceiveMessageCallbackStatus>>>((_, __, callback) => messageCallback = callback);
                _connectionManagerMock.Setup(p => p.SetMethodCallbackAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<MethodCallback>()))
                    .Callback<string, string, MethodCallback>((_, __, callback) => methodCallback = callback);
                _connectionManagerMock.Setup(p => p.SetDesiredPropertyUpdateCallbackAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<DesiredPropertyUpdateCallback>()))
                    .Callback<string, string, DesiredPropertyUpdateCallback>((_, __, callback) => propertyCallback = callback);

                // If subscriptions are returned form the DB, callbacks are registered and a connection is scheduled
                _storageProviderMock.Setup(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
                var c2dSub = TestUtils.GetTestSubscription("test-device-id", DeviceSubscriptionType.C2DMessages);
                var methodSub = TestUtils.GetTestSubscription("test-device-id", DeviceSubscriptionType.Methods);
                var propertySub = TestUtils.GetTestSubscription("test-device-id", DeviceSubscriptionType.DesiredProperties);
                _storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), "test-device-id")).Returns(Task.FromResult(new List<DeviceSubscription>() { c2dSub, methodSub, propertySub }));
                var subscriptionCallbackFactory = new SubscriptionCallbackFactory(LogManager.GetCurrentClassLogger(), _httpClientFactoryMock.Object);
                var subscriptionScheduler = new SubscriptionScheduler(LogManager.GetCurrentClassLogger(), _connectionManagerMock.Object, _storageProviderMock.Object, subscriptionCallbackFactory, 2, 10);
                SemaphoreSlim createSemaphore = null;
                TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => createSemaphore = capturedSemaphore);
                await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device-id", false);
                _connectionManagerMock.Verify(p => p.SetMessageCallbackAsync("test-device-id", "http://abc", It.IsAny<Func<Message, Task<ReceiveMessageCallbackStatus>>>()), Times.Once);
                await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler);
                _connectionManagerMock.Verify(p => p.AssertDeviceConnectionOpenAsync("test-device-id", false, null), Times.Once);

                // If DB returns no subscriptions, callbacks are unregistered and connection is closed.
                _connectionManagerMock.Invocations.Clear();
                _storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), "test-device-id")).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
                SemaphoreSlim deleteSemaphore = null;
                TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => deleteSemaphore = capturedSemaphore);
                await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device-id", false);
                _connectionManagerMock.Verify(p => p.RemoveMessageCallbackAsync("test-device-id"), Times.Once);
                _connectionManagerMock.Verify(p => p.RemoveMethodCallbackAsync("test-device-id"), Times.Once);
                _connectionManagerMock.Verify(p => p.RemoveDesiredPropertyUpdateCallbackAsync("test-device-id"), Times.Once);
                _connectionManagerMock.Verify(p => p.AssertDeviceConnectionClosedAsync("test-device-id", false), Times.Once);

                // Create and delete lock on the same semaphore.
                Assert.AreEqual(createSemaphore, deleteSemaphore);

                // Operations on another device lock over a different semaphore.
                _storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), "another-device-id")).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
                SemaphoreSlim anotherDeviceSemaphore = null;
                TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => anotherDeviceSemaphore = capturedSemaphore);
                await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("another-device-id", false);
                Assert.AreNotEqual(anotherDeviceSemaphore, deleteSemaphore);

                // Checks that C2D message callback sends proper body and accepts the message on success.
                _httpClientFactoryMock.Setup(p => p.CreateClient("RetryClient")).Returns(new System.Net.Http.Fakes.ShimHttpClient().Instance);
                System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) =>
                {
                    Assert.AreEqual("http://abc", url);
                    var result = JsonConvert.DeserializeObject<C2DMessageInvocationEventBody>(payload.ReadAsStringAsync().Result);
                    Assert.AreEqual("test-device-id", result.DeviceId);
                    Assert.AreEqual("{\"tel\":1}", result.MessageBody.Value);

                    return Task.FromResult(new HttpResponseMessage()
                    {
                        StatusCode = (HttpStatusCode)200,
                    });
                };
                Message testMsg = new Message(Encoding.UTF8.GetBytes("{\"tel\": 1}"));
                var callbackResult = await messageCallback(testMsg);
                Assert.AreEqual(ReceiveMessageCallbackStatus.Accept, callbackResult);

                // Checks that message callback rejects the message on a 4xx errors.
                System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) => Task.FromResult(new HttpResponseMessage() { StatusCode = (HttpStatusCode)401, });
                callbackResult = await messageCallback(new Message(Encoding.UTF8.GetBytes("{\"tel\": 1}")));
                Assert.AreEqual(ReceiveMessageCallbackStatus.Reject, callbackResult);

                // Checks that message callback abandons the message on network errors.
                System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) => throw new Exception();
                callbackResult = await messageCallback(new Message(Encoding.UTF8.GetBytes("{\"tel\": 1}")));
                Assert.AreEqual(ReceiveMessageCallbackStatus.Abandon, callbackResult);

                // Check that method callback correctly extracts the response status from the callback payload.
                System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) =>
                {
                    Assert.AreEqual("http://abc", url);
                    var result = JsonConvert.DeserializeObject<MethodInvocationEventBody>(payload.ReadAsStringAsync().Result);
                    Assert.AreEqual("test-device-id", result.DeviceId);
                    Assert.AreEqual("tst-name", result.MethodName);
                    Assert.AreEqual("{\"tel\":1}", result.RequestData.Value);

                    return Task.FromResult(new HttpResponseMessage()
                    {
                        StatusCode = (HttpStatusCode)200,
                        Content = new StringContent("{\"status\": 200}", Encoding.UTF8, "application/json"),
                    });
                };
                var methodCallbackResult = await methodCallback(new MethodRequest("tst-name", Encoding.UTF8.GetBytes("{\"tel\": 1}")), null);
                Assert.AreEqual(200, methodCallbackResult.Status);

                // Check that property update callback sends the correct data.
                System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) =>
                {
                    Assert.AreEqual("http://abc", url);
                    var result = JsonConvert.DeserializeObject<DesiredPropertyUpdateEventBody>(payload.ReadAsStringAsync().Result);
                    Assert.AreEqual("test-device-id", result.DeviceId);
                    Assert.AreEqual("{\"tel\":1}", result.DesiredProperties.Value);

                    return Task.FromResult(new HttpResponseMessage()
                    {
                        StatusCode = (HttpStatusCode)200,
                    });
                };
                await propertyCallback(new TwinCollection("{\"tel\": 1}"), null);
            }
        }