in DeviceBridgeTests/Services/SubscriptionSchedulerTests.cs [117:232]
public async Task DataSubscriptionsSyncAndCallbackBehavior()
{
using (ShimsContext.Create())
{
_storageProviderMock.Invocations.Clear();
_connectionManagerMock.Invocations.Clear();
// Capture all registered callbacks for verification.
Func<Message, Task<ReceiveMessageCallbackStatus>> messageCallback = null;
MethodCallback methodCallback = null;
DesiredPropertyUpdateCallback propertyCallback = null;
_connectionManagerMock.Setup(p => p.SetMessageCallbackAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<Func<Message, Task<ReceiveMessageCallbackStatus>>>()))
.Callback<string, string, Func<Message, Task<ReceiveMessageCallbackStatus>>>((_, __, callback) => messageCallback = callback);
_connectionManagerMock.Setup(p => p.SetMethodCallbackAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<MethodCallback>()))
.Callback<string, string, MethodCallback>((_, __, callback) => methodCallback = callback);
_connectionManagerMock.Setup(p => p.SetDesiredPropertyUpdateCallbackAsync(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<DesiredPropertyUpdateCallback>()))
.Callback<string, string, DesiredPropertyUpdateCallback>((_, __, callback) => propertyCallback = callback);
// If subscriptions are returned form the DB, callbacks are registered and a connection is scheduled
_storageProviderMock.Setup(p => p.ListAllSubscriptionsOrderedByDeviceId(It.IsAny<Logger>())).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
var c2dSub = TestUtils.GetTestSubscription("test-device-id", DeviceSubscriptionType.C2DMessages);
var methodSub = TestUtils.GetTestSubscription("test-device-id", DeviceSubscriptionType.Methods);
var propertySub = TestUtils.GetTestSubscription("test-device-id", DeviceSubscriptionType.DesiredProperties);
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), "test-device-id")).Returns(Task.FromResult(new List<DeviceSubscription>() { c2dSub, methodSub, propertySub }));
var subscriptionCallbackFactory = new SubscriptionCallbackFactory(LogManager.GetCurrentClassLogger(), _httpClientFactoryMock.Object);
var subscriptionScheduler = new SubscriptionScheduler(LogManager.GetCurrentClassLogger(), _connectionManagerMock.Object, _storageProviderMock.Object, subscriptionCallbackFactory, 2, 10);
SemaphoreSlim createSemaphore = null;
TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => createSemaphore = capturedSemaphore);
await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device-id", false);
_connectionManagerMock.Verify(p => p.SetMessageCallbackAsync("test-device-id", "http://abc", It.IsAny<Func<Message, Task<ReceiveMessageCallbackStatus>>>()), Times.Once);
await RunSchedulerOnceAndWaitConnectionAttempts(subscriptionScheduler);
_connectionManagerMock.Verify(p => p.AssertDeviceConnectionOpenAsync("test-device-id", false, null), Times.Once);
// If DB returns no subscriptions, callbacks are unregistered and connection is closed.
_connectionManagerMock.Invocations.Clear();
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), "test-device-id")).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
SemaphoreSlim deleteSemaphore = null;
TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => deleteSemaphore = capturedSemaphore);
await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("test-device-id", false);
_connectionManagerMock.Verify(p => p.RemoveMessageCallbackAsync("test-device-id"), Times.Once);
_connectionManagerMock.Verify(p => p.RemoveMethodCallbackAsync("test-device-id"), Times.Once);
_connectionManagerMock.Verify(p => p.RemoveDesiredPropertyUpdateCallbackAsync("test-device-id"), Times.Once);
_connectionManagerMock.Verify(p => p.AssertDeviceConnectionClosedAsync("test-device-id", false), Times.Once);
// Create and delete lock on the same semaphore.
Assert.AreEqual(createSemaphore, deleteSemaphore);
// Operations on another device lock over a different semaphore.
_storageProviderMock.Setup(p => p.ListDeviceSubscriptions(It.IsAny<Logger>(), "another-device-id")).Returns(Task.FromResult(new List<DeviceSubscription>() { }));
SemaphoreSlim anotherDeviceSemaphore = null;
TestUtils.CaptureSemaphoreOnWait(capturedSemaphore => anotherDeviceSemaphore = capturedSemaphore);
await subscriptionScheduler.SynchronizeDeviceDbAndEngineDataSubscriptionsAsync("another-device-id", false);
Assert.AreNotEqual(anotherDeviceSemaphore, deleteSemaphore);
// Checks that C2D message callback sends proper body and accepts the message on success.
_httpClientFactoryMock.Setup(p => p.CreateClient("RetryClient")).Returns(new System.Net.Http.Fakes.ShimHttpClient().Instance);
System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) =>
{
Assert.AreEqual("http://abc", url);
var result = JsonConvert.DeserializeObject<C2DMessageInvocationEventBody>(payload.ReadAsStringAsync().Result);
Assert.AreEqual("test-device-id", result.DeviceId);
Assert.AreEqual("{\"tel\":1}", result.MessageBody.Value);
return Task.FromResult(new HttpResponseMessage()
{
StatusCode = (HttpStatusCode)200,
});
};
Message testMsg = new Message(Encoding.UTF8.GetBytes("{\"tel\": 1}"));
var callbackResult = await messageCallback(testMsg);
Assert.AreEqual(ReceiveMessageCallbackStatus.Accept, callbackResult);
// Checks that message callback rejects the message on a 4xx errors.
System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) => Task.FromResult(new HttpResponseMessage() { StatusCode = (HttpStatusCode)401, });
callbackResult = await messageCallback(new Message(Encoding.UTF8.GetBytes("{\"tel\": 1}")));
Assert.AreEqual(ReceiveMessageCallbackStatus.Reject, callbackResult);
// Checks that message callback abandons the message on network errors.
System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) => throw new Exception();
callbackResult = await messageCallback(new Message(Encoding.UTF8.GetBytes("{\"tel\": 1}")));
Assert.AreEqual(ReceiveMessageCallbackStatus.Abandon, callbackResult);
// Check that method callback correctly extracts the response status from the callback payload.
System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) =>
{
Assert.AreEqual("http://abc", url);
var result = JsonConvert.DeserializeObject<MethodInvocationEventBody>(payload.ReadAsStringAsync().Result);
Assert.AreEqual("test-device-id", result.DeviceId);
Assert.AreEqual("tst-name", result.MethodName);
Assert.AreEqual("{\"tel\":1}", result.RequestData.Value);
return Task.FromResult(new HttpResponseMessage()
{
StatusCode = (HttpStatusCode)200,
Content = new StringContent("{\"status\": 200}", Encoding.UTF8, "application/json"),
});
};
var methodCallbackResult = await methodCallback(new MethodRequest("tst-name", Encoding.UTF8.GetBytes("{\"tel\": 1}")), null);
Assert.AreEqual(200, methodCallbackResult.Status);
// Check that property update callback sends the correct data.
System.Net.Http.Fakes.ShimHttpClient.AllInstances.PostAsyncStringHttpContent = (client, url, payload) =>
{
Assert.AreEqual("http://abc", url);
var result = JsonConvert.DeserializeObject<DesiredPropertyUpdateEventBody>(payload.ReadAsStringAsync().Result);
Assert.AreEqual("test-device-id", result.DeviceId);
Assert.AreEqual("{\"tel\":1}", result.DesiredProperties.Value);
return Task.FromResult(new HttpResponseMessage()
{
StatusCode = (HttpStatusCode)200,
});
};
await propertyCallback(new TwinCollection("{\"tel\": 1}"), null);
}
}