DeviceBridgeTests/Services/ConnectionManagerTests.cs (583 lines of code) (raw):
// Copyright (c) Microsoft Corporation. All rights reserved.
using System;
using System.Collections.Generic;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using DeviceBridge.Common.Exceptions;
using DeviceBridge.Models;
using DeviceBridge.Providers;
using DeviceBridgeTests.Common;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Client.Exceptions;
using Microsoft.Azure.Devices.Provisioning.Client;
using Microsoft.Azure.Devices.Shared;
using Microsoft.QualityTools.Testing.Fakes;
using Moq;
using NLog;
using NUnit.Framework;
namespace DeviceBridge.Services.Tests
{
[TestFixture]
public class ConnectionManagerTests
{
private Mock<IStorageProvider> _storageProviderMock = new Mock<IStorageProvider>();
[Test]
public async Task AssertDeviceConnectionOpenAsyncMutualExclusion()
{
using (ShimsContext.Create())
{
var connectionManager = CreateConnectionManager();
// Check that client open and close operations for the same device block on the same mutex.
SemaphoreSlim openSemaphore = null, closeSemaphore = null;
TestUtils.CaptureSemaphoreOnWait((semaphore) => openSemaphore = semaphore);
ShimDps("test-hub.azure.devices.net");
ShimDeviceClient();
await connectionManager.AssertDeviceConnectionOpenAsync("test-device-id");
TestUtils.CaptureSemaphoreOnWait((semaphore) => closeSemaphore = semaphore);
await connectionManager.AssertDeviceConnectionClosedAsync("test-device-id");
Assert.IsNotNull(openSemaphore);
Assert.AreEqual(openSemaphore, closeSemaphore);
// Check that client open operations for different devices block on different mutexes.
SemaphoreSlim anotherDeviceOpenSemaphore = null;
TestUtils.CaptureSemaphoreOnWait((semaphore) => anotherDeviceOpenSemaphore = semaphore);
await connectionManager.AssertDeviceConnectionOpenAsync("another-test-device-id");
Assert.IsNotNull(anotherDeviceOpenSemaphore);
Assert.AreNotEqual(openSemaphore, anotherDeviceOpenSemaphore);
// Check that the mutex is unlocked on failure
ShimDeviceClientToFail();
SemaphoreSlim openFailSemaphore = null;
TestUtils.CaptureSemaphoreOnWait((semaphore) => openFailSemaphore = semaphore);
await ExpectToThrow(() => connectionManager.AssertDeviceConnectionOpenAsync("device-to-fail-id"));
Assert.AreEqual(1, openFailSemaphore.CurrentCount);
// Check that a device connection attempt time is registered before it enters the critical section.
var startTime = DateTime.Now;
SemaphoreSlim connectionTimeSemaphore = null;
ShimDeviceClient();
TestUtils.CaptureSemaphoreOnWait((semaphore) =>
{
connectionTimeSemaphore = semaphore;
Assert.IsNotNull(connectionManager.GetDevicesThatConnectedSince(startTime).Find(id => id == "connection-time-test-id"));
});
await connectionManager.AssertDeviceConnectionOpenAsync("connection-time-test-id");
Assert.NotNull(connectionTimeSemaphore);
}
}
[Test]
public async Task AssertDeviceConnectionOpenAsyncTemporaryVsPermanent()
{
using (ShimsContext.Create())
{
var connectionManager = CreateConnectionManager();
int closeCount = 0;
// If temporary is set to false (default), creates a permanent connection without creating or renewing a temporary connection.
ShimDps("test-hub.azure.devices.net");
ShimDeviceClientAndCaptureClose(() => closeCount++);
await connectionManager.AssertDeviceConnectionOpenAsync("permanent-device-id");
await connectionManager.AssertDeviceConnectionClosedAsync("permanent-device-id", true);
Assert.AreEqual(0, closeCount, "Closing a temporary connection should not have closed a permanent connection");
await connectionManager.AssertDeviceConnectionClosedAsync("permanent-device-id");
Assert.AreEqual(1, closeCount);
// If temporary is set to true, creates a temporary connection if one doesn't exist, without creating a permanent connection.
closeCount = 0;
await connectionManager.AssertDeviceConnectionOpenAsync("temporary-device-id", true);
TestUtils.ShimUtcNowAhead(20); // Move the clock so the temporary connection will expire.
await connectionManager.AssertDeviceConnectionClosedAsync("temporary-device-id");
Assert.AreEqual(0, closeCount, "Closing a permanent connection should not have closed a temporary connection");
await connectionManager.AssertDeviceConnectionClosedAsync("temporary-device-id", true);
Assert.AreEqual(1, closeCount);
// If temporary is set to true, renews a temporary connection if one already exists, without creating a permanent connection.
closeCount = 0;
TestUtils.UnshimUtcNow();
await connectionManager.AssertDeviceConnectionOpenAsync("renew-device-id", true); // Create initial ~10min connection.
TestUtils.ShimUtcNowAhead(5);
await connectionManager.AssertDeviceConnectionOpenAsync("renew-device-id", true); // Move the clock 5min and renew connection for another ~10min, so total connection duration is ~15min.
TestUtils.ShimUtcNowAhead(12);
await connectionManager.AssertDeviceConnectionClosedAsync("renew-device-id", true);
Assert.AreEqual(0, closeCount, "Temporary connection should not have been closed after 12min, as it was renewed for ~15min");
TestUtils.ShimUtcNowAhead(18);
await connectionManager.AssertDeviceConnectionClosedAsync("renew-device-id", true);
Assert.AreEqual(1, closeCount, "Temporary connection should have been closed after 18min.");
}
}
[Test]
public async Task AssertDeviceConnectionOpenAsyncRecreateFailedClient()
{
using (ShimsContext.Create())
{
var connectionManager = CreateConnectionManager();
int closeCount = 0;
// Create a client that instantly goes to a failure state.
ShimDps("test-hub.azure.devices.net");
ShimDeviceClientAndEmitStatus(ConnectionStatus.Disconnected, ConnectionStatusChangeReason.Retry_Expired);
await connectionManager.AssertDeviceConnectionOpenAsync("recreate-failed-device-id");
// Check that it tries to recreate a client in a permanent failure state
ShimDeviceClientAndCaptureClose(() => closeCount++);
await connectionManager.AssertDeviceConnectionOpenAsync("recreate-failed-device-id");
Assert.AreEqual(1, closeCount);
}
}
[Test]
public async Task AssertDeviceConnectionOpenAsyncTriesCachedHub()
{
using (ShimsContext.Create())
{
var hubCache = new List<HubCacheEntry>()
{
new HubCacheEntry()
{
DeviceId = "test-device-id",
Hub = "known-hub.azure.devices.net",
},
};
var connectionManager = CreateConnectionManager(hubCache);
// Check that it Attempts to connect to the cached device hub first, if one exists.
string connStr = null;
ShimDeviceClientAndCaptureConnectionString(capturedConnStr => connStr = capturedConnStr);
await connectionManager.AssertDeviceConnectionOpenAsync("test-device-id");
StringAssert.Contains("known-hub.azure.devices.net", connStr);
// Check that the device client is cached and not reopened in subsequent calls.
bool openAttempted = false;
ShimDeviceClientAndCaptureOpen(() => openAttempted = true);
await connectionManager.AssertDeviceConnectionOpenAsync("test-device-id");
Assert.False(openAttempted);
// Check that DPS registration is eventually attempted if connection error indicates that the device doesn't exist in the target hub.
connectionManager = CreateConnectionManager(hubCache);
ShimDeviceClientToFail(new DeviceNotFoundException());
var registrationAttempted = false;
ShimDpsAndCaptureRegistration("test-hub.azure.devices.net", () => registrationAttempted = true);
await ExpectToThrow(() => connectionManager.AssertDeviceConnectionOpenAsync("test-device-id"));
Assert.True(registrationAttempted);
// Check that DPS registration is attempted if connection attempt fails with unknown error.
registrationAttempted = false;
ShimDeviceClientToFail(new Exception());
await ExpectToThrow(() => connectionManager.AssertDeviceConnectionOpenAsync("test-device-id"));
Assert.True(registrationAttempted);
}
}
[Test]
public async Task AssertDeviceConnectionOpenAsyncDps()
{
using (ShimsContext.Create())
{
var connectionManager = CreateConnectionManager();
_storageProviderMock.Invocations.Clear();
// Checks that it attempts to connect to the hub returned by DPS.
string connStr = null;
ShimDps("test-hub.azure.devices.net");
ShimDeviceClientAndCaptureConnectionString(capturedConnStr => connStr = capturedConnStr);
await connectionManager.AssertDeviceConnectionOpenAsync("dps-test-device-id");
Assert.True(connStr.Contains("test-hub.azure.devices.net"));
// Check that the hub returned by DPS was cached in the DB.
_storageProviderMock.Verify(p => p.AddOrUpdateHubCacheEntry(It.IsAny<Logger>(), "dps-test-device-id", "test-hub.azure.devices.net"), Times.Once);
// Checks that failure to save hub in DB cache doesn't fail the open operation.
connectionManager = CreateConnectionManager();
_storageProviderMock.Setup(p => p.AddOrUpdateHubCacheEntry(It.IsAny<Logger>(), It.IsAny<string>(), It.IsAny<string>())).Throws(new Exception());
await connectionManager.AssertDeviceConnectionOpenAsync("dps-test-device-id");
_storageProviderMock.Setup(p => p.AddOrUpdateHubCacheEntry(It.IsAny<Logger>(), It.IsAny<string>(), It.IsAny<string>())).Verifiable();
// Check that the device client is cached and not reopened in subsequent calls.
bool openAttempted = false;
ShimDeviceClientAndCaptureOpen(() => openAttempted = true);
await connectionManager.AssertDeviceConnectionOpenAsync("dps-test-device-id");
Assert.False(openAttempted);
// Operation fails if DPS registration fails.
connectionManager = CreateConnectionManager();
ShimDpsToFail();
await ExpectToThrow(() => connectionManager.AssertDeviceConnectionOpenAsync("dps-test-device-id"));
// Fails with DpsRegistrationFailedWithUnknownStatusException if DPS returns unknown response.
ShimDps("test-hub.azure.devices.net", ProvisioningRegistrationStatusType.Failed);
await ExpectToThrow(() => connectionManager.AssertDeviceConnectionOpenAsync("dps-test-device-id"), e => e is DpsRegistrationFailedWithUnknownStatusException);
}
}
[Test]
public async Task AssertDeviceConnectionOpenAsyncClient()
{
using (ShimsContext.Create())
{
// Uses correct connection string.
var connectionManager = CreateConnectionManager();
string connStr = null;
ShimDps("test-hub.azure.devices.net");
ShimDeviceClientAndCaptureConnectionString(capturedConnStr => connStr = capturedConnStr);
await connectionManager.AssertDeviceConnectionOpenAsync("test-device-id");
using (var hmac = new HMACSHA256(Encoding.UTF8.GetBytes("test-sas-key")))
{
var derivedKey = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes("test-device-id")));
Assert.AreEqual($"HostName=test-hub.azure.devices.net;DeviceId=test-device-id;SharedAccessKey={derivedKey}", connStr);
}
// Sets connection status change handler that updates local device connection status and calls user-defined
// callback if one exists and we're not in hub-probing phase.
connectionManager = CreateConnectionManager();
var globalStatusCallbackCalled = false;
connectionManager.SetGlobalConnectionStatusCallback((_, __, ___) => Task.FromResult(globalStatusCallbackCalled = true));
var statusCallbackCalled = false;
connectionManager.SetConnectionStatusCallback("test-device-id", (_, __) => Task.FromResult(statusCallbackCalled = true));
ShimDeviceClientAndEmitStatus(ConnectionStatus.Connected, ConnectionStatusChangeReason.Connection_Ok);
await connectionManager.AssertDeviceConnectionOpenAsync("test-device-id");
Assert.True(globalStatusCallbackCalled);
Assert.True(statusCallbackCalled);
var status = connectionManager.GetDeviceStatus("test-device-id");
Assert.AreEqual(ConnectionStatus.Connected, status?.status);
Assert.AreEqual(ConnectionStatusChangeReason.Connection_Ok, status?.reason);
// Correctly sets desired property update, methods, and C2D message callbacks if they exist.
connectionManager = CreateConnectionManager();
bool desiredPropertyCallbackCalled = false, methodCallbackCalled = false, c2dCallbackCalled = false;
await connectionManager.SetMethodCallbackAsync("test-device-id", "", (_, __) => {
methodCallbackCalled = true;
return Task.FromResult(new MethodResponse(200));
});
await connectionManager.SetMessageCallbackAsync("test-device-id", "", (_) =>
{
c2dCallbackCalled = true;
return Task.FromResult(ReceiveMessageCallbackStatus.Accept);
});
await connectionManager.SetDesiredPropertyUpdateCallbackAsync("test-device-id", "", (_, __) => Task.FromResult(desiredPropertyCallbackCalled = true));
MethodCallback capturedMethodCallback = null;
ReceiveMessageCallback capturedMessageCallback = null;
DesiredPropertyUpdateCallback capturedPropertyUpdateCallback = null;
ShimDeviceClientAndCaptureAllHandlers(handler => capturedMethodCallback = handler, handler => capturedMessageCallback = handler, handler => capturedPropertyUpdateCallback = handler);
await connectionManager.AssertDeviceConnectionOpenAsync("test-device-id");
await capturedMethodCallback(null, null);
await capturedMessageCallback(null, null);
await capturedPropertyUpdateCallback(null, null);
Assert.True(desiredPropertyCallbackCalled);
Assert.True(methodCallbackCalled);
Assert.True(c2dCallbackCalled);
// Check that client is disposed and status change handler is unregistered if open fails.
bool disposed = false;
ConnectionStatusChangesHandler statusChangeHandler = (_, __) => { };
ShimDeviceClientCaptureCloseDisposeAndConnectionStatusHandler(() => { }, () => disposed = true, handler => statusChangeHandler = handler);
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.OpenAsync = (@this) => throw new Exception();
await ExpectToThrow(() => connectionManager.AssertDeviceConnectionOpenAsync("fail-device-id"));
Assert.True(disposed);
Assert.IsNull(statusChangeHandler);
}
}
[Test]
public async Task Callbacks()
{
using (ShimsContext.Create())
{
var connectionManager = CreateConnectionManager();
// Checks that callbacks are registered even if the was already open at registration time.
var capturedSemaphores = new List<SemaphoreSlim>();
MethodCallback capturedMethodCallback = null;
ReceiveMessageCallback capturedMessageCallback = null;
DesiredPropertyUpdateCallback capturedPropertyUpdateCallback = null;
ShimDeviceClientAndCaptureAllHandlers(handler => capturedMethodCallback = handler, handler => capturedMessageCallback = handler, handler => capturedPropertyUpdateCallback = handler);
ShimDps("test-hub.azure.devices.net");
TestUtils.CaptureSemaphoreOnWait((semaphore) => capturedSemaphores.Add(semaphore));
await connectionManager.AssertDeviceConnectionOpenAsync("test-device-id");
bool desiredPropertyCallbackCalled = false, methodCallbackCalled = false, c2dCallbackCalled = false;
await connectionManager.SetMethodCallbackAsync("test-device-id", "method-callback-id", (_, __) =>
{
methodCallbackCalled = true;
return Task.FromResult(new MethodResponse(200));
});
await connectionManager.SetMessageCallbackAsync("test-device-id", "message-callback-id", (_) =>
{
c2dCallbackCalled = true;
return Task.FromResult(ReceiveMessageCallbackStatus.Accept);
});
await connectionManager.SetDesiredPropertyUpdateCallbackAsync("test-device-id", "property-callback-id", (_, __) => Task.FromResult(desiredPropertyCallbackCalled = true));
await capturedMethodCallback(null, null);
await capturedMessageCallback(null, null);
await capturedPropertyUpdateCallback(null, null);
Assert.True(desiredPropertyCallbackCalled);
Assert.True(methodCallbackCalled);
Assert.True(c2dCallbackCalled);
// Check that callback Ids are correctly returned.
Assert.AreEqual("method-callback-id", connectionManager.GetCurrentMethodCallbackId("test-device-id"));
Assert.AreEqual("message-callback-id", connectionManager.GetCurrentMessageCallbackId("test-device-id"));
Assert.AreEqual("property-callback-id", connectionManager.GetCurrentDesiredPropertyUpdateCallbackId("test-device-id"));
// Check that callbacks are properly removed.
capturedMethodCallback = null;
capturedMessageCallback = null;
var oldCapturedPropertyUpdateCallback = capturedPropertyUpdateCallback;
capturedPropertyUpdateCallback = null;
await connectionManager.RemoveMethodCallbackAsync("test-device-id");
await connectionManager.RemoveMessageCallbackAsync("test-device-id");
await connectionManager.RemoveDesiredPropertyUpdateCallbackAsync("test-device-id");
Assert.IsNull(connectionManager.GetCurrentMethodCallbackId("test-device-id"));
Assert.IsNull(connectionManager.GetCurrentMessageCallbackId("test-device-id"));
Assert.IsNull(connectionManager.GetCurrentDesiredPropertyUpdateCallbackId("test-device-id"));
Assert.IsNull(capturedMethodCallback);
Assert.IsNull(capturedMessageCallback);
Assert.AreNotEqual(capturedPropertyUpdateCallback, oldCapturedPropertyUpdateCallback); // Removing the property callback just replaces it with an empty one
// Check that all callback register/unregister operations locked on the same semaphore as the connection open operation
Assert.AreEqual(7 /* 1 open, 3 register, 3 unregister */, capturedSemaphores.Count);
Assert.IsNull(capturedSemaphores.Find(s => s != capturedSemaphores[0]));
// Check that C2D messages are acknowledged according to the callback result.
ReceiveMessageCallbackStatus status = ReceiveMessageCallbackStatus.Accept;
await connectionManager.SetMessageCallbackAsync("test-device-id", "message-callback-id", (_) =>
{
return Task.FromResult(status);
});
bool completed = false, rejected = false, abandoned = false;
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.CompleteAsyncMessage = (@this, message) => Task.FromResult(completed = true);
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.RejectAsyncMessage = (@this, message) => Task.FromResult(rejected = true);
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.AbandonAsyncMessage = (@this, message) => Task.FromResult(abandoned = true);
status = ReceiveMessageCallbackStatus.Accept;
await capturedMessageCallback(null, null);
Assert.True(completed);
status = ReceiveMessageCallbackStatus.Reject;
await capturedMessageCallback(null, null);
Assert.True(rejected);
status = ReceiveMessageCallbackStatus.Abandon;
await capturedMessageCallback(null, null);
Assert.True(abandoned);
}
}
[Test]
public async Task AssertDeviceConnectionClosedAsync()
{
using (ShimsContext.Create())
{
// Checks that closing the client calls close, dispose, and removes the connection status change handler.
var connectionManager = CreateConnectionManager();
bool closed = false, disposed = false;
ConnectionStatusChangesHandler statusChangeHandler = (_, __) => { };
ShimDeviceClientCaptureCloseDisposeAndConnectionStatusHandler(() => closed = true, () => disposed = true, handler => statusChangeHandler = handler);
ShimDps("test-hub.azure.devices.net");
await connectionManager.AssertDeviceConnectionOpenAsync("test-device-id");
await connectionManager.AssertDeviceConnectionClosedAsync("test-device-id");
Assert.True(closed);
Assert.True(disposed);
Assert.IsNull(statusChangeHandler);
}
}
[Test]
public async Task StandaloneDpsRegistrationAsync()
{
using (ShimsContext.Create())
{
// Check that correct model Id is sent to DPS.
_storageProviderMock.Invocations.Clear();
var connectionManager = CreateConnectionManager();
ProvisioningRegistrationAdditionalData capturedPayload = null;
ShimDpsAndCaptureRegistration("test-hub.azure.devices.net", payload => capturedPayload = payload);
const string testModelId = "test-model-id";
await connectionManager.StandaloneDpsRegistrationAsync(LogManager.GetCurrentClassLogger(), "test-device-id", testModelId);
Assert.AreEqual($"{{\"modelId\":\"{testModelId}\"}}", capturedPayload.JsonData);
// Check that hub was cached in the DB.
_storageProviderMock.Verify(p => p.AddOrUpdateHubCacheEntry(It.IsAny<Logger>(), "test-device-id", "test-hub.azure.devices.net"), Times.Once);
// Checks that failure to save hub in DB cache doesn't fail the operation.
_storageProviderMock.Setup(p => p.AddOrUpdateHubCacheEntry(It.IsAny<Logger>(), It.IsAny<string>(), It.IsAny<string>())).Throws(new Exception());
await connectionManager.StandaloneDpsRegistrationAsync(LogManager.GetCurrentClassLogger(), "test-device-id", testModelId);
_storageProviderMock.Setup(p => p.AddOrUpdateHubCacheEntry(It.IsAny<Logger>(), It.IsAny<string>(), It.IsAny<string>())).Verifiable();
}
}
[Test]
public async Task SendEventAsync()
{
using (ShimsContext.Create())
{
// Check that operation fails if connection doesn't exist.
var connectionManager = CreateConnectionManager();
await ExpectToThrow(() => connectionManager.SendEventAsync(LogManager.GetCurrentClassLogger(), "test-device-id", new Dictionary<string, object>() { { "telemetry", "val1" }, }, default), e => e is DeviceConnectionNotFoundException);
Microsoft.Azure.Devices.Client.Message message = null;
ShimDeviceClientAndCaptureSend(capturedMessage => message = capturedMessage);
ShimDps("test-hub.azure.devices.net");
await connectionManager.AssertDeviceConnectionOpenAsync("test-device-id");
var testCreationTime = DateTime.Now;
await connectionManager.SendEventAsync(LogManager.GetCurrentClassLogger(), "test-device-id", new Dictionary<string, object>() { { "telemetry", "val1" }, }, default, new Dictionary<string, string>() { { "prop", "val2" }, }, "test-component", testCreationTime);
// Check that the correct message is sent.
var body = JsonSerializer.Deserialize<Dictionary<string, string>>(Encoding.UTF8.GetString(message.GetBytes()));
Assert.AreEqual("val1", body["telemetry"]);
Assert.AreEqual(Encoding.UTF8.WebName, message.ContentEncoding);
Assert.AreEqual("application/json", message.ContentType);
Assert.AreEqual("test-component", message.ComponentName);
Assert.AreEqual("val2", message.Properties["prop"]);
Assert.AreEqual(testCreationTime, message.CreationTimeUtc);
}
}
[Test]
public async Task GetTwinAsync()
{
using (ShimsContext.Create())
{
// Check that operation fails if connection doesn't exist.
var connectionManager = CreateConnectionManager();
await ExpectToThrow(() => connectionManager.GetTwinAsync(LogManager.GetCurrentClassLogger(), "test-device-id", default), e => e is DeviceConnectionNotFoundException);
// Check that correct twin is returned.
Twin testTwin = new Twin();
ShimDeviceClientAndReturnTwin(testTwin);
ShimDps("test-hub.azure.devices.net");
await connectionManager.AssertDeviceConnectionOpenAsync("test-device-id");
var returnedTwin = await connectionManager.GetTwinAsync(LogManager.GetCurrentClassLogger(), "test-device-id", default);
Assert.AreEqual(testTwin, returnedTwin);
}
}
[Test]
public async Task UpdateReportedPropertiesAsync()
{
using (ShimsContext.Create())
{
// Check that operation fails if connection doesn't exist.
var connectionManager = CreateConnectionManager();
await ExpectToThrow(() => connectionManager.UpdateReportedPropertiesAsync(LogManager.GetCurrentClassLogger(), "test-device-id", new Dictionary<string, object>() { { "prop", "val2" }, }, default), e => e is DeviceConnectionNotFoundException);
TwinCollection patch = null;
ShimDeviceClientAndCapturePropertyUpdate(capturedPatch => patch = capturedPatch);
ShimDps("test-hub.azure.devices.net");
await connectionManager.AssertDeviceConnectionOpenAsync("test-device-id");
await connectionManager.UpdateReportedPropertiesAsync(LogManager.GetCurrentClassLogger(), "test-device-id", new Dictionary<string, object>() { { "prop", "val2" }, }, default);
// Assert that patch has correct contents.
var body = JsonSerializer.Deserialize<Dictionary<string, string>>(patch.ToJson());
Assert.AreEqual("val2", body["prop"]);
}
}
[Test]
public async Task Dispose()
{
using (ShimsContext.Create())
{
// Check that all clients are disposed when connection manager is disposed.
var connectionManager = CreateConnectionManager();
int disposeCount = 0;
ShimDeviceClientAndCaptureDispose(() => {
disposeCount++;
});
ShimDps("test-hub.azure.devices.net");
await connectionManager.AssertDeviceConnectionOpenAsync("device1");
await connectionManager.AssertDeviceConnectionOpenAsync("device2");
await connectionManager.AssertDeviceConnectionOpenAsync("device3");
connectionManager.Dispose();
Assert.AreEqual(3, disposeCount);
}
}
private ConnectionManager CreateConnectionManager(List<HubCacheEntry> hubCache = null)
{
_storageProviderMock.Setup(p => p.ListHubCacheEntries(It.IsAny<Logger>())).Returns(Task.FromResult(hubCache ?? new List<HubCacheEntry>()));
return new ConnectionManager(LogManager.GetCurrentClassLogger(), "test-id-scope", Convert.ToBase64String(Encoding.UTF8.GetBytes("test-sas-key")), 50, _storageProviderMock.Object);
}
/// <summary>
/// Shims DPS registration to return a successful assignment.
/// </summary>
/// <remarks>Must be used within a ShimsContext.</remarks>
/// <param name="hubToAssign">Hub to be returned in the assignment.</param>
private static void ShimDps(string hubToAssign, ProvisioningRegistrationStatusType? status = null)
{
Microsoft.Azure.Devices.Provisioning.Client.Fakes.ShimProvisioningDeviceClient.AllInstances.RegisterAsync = (@this) =>
Task.FromResult(new DeviceRegistrationResult("some-id", DateTime.Now, hubToAssign, "some-id", status ?? ProvisioningRegistrationStatusType.Assigned, "", DateTime.Now, 0, "", ""));
}
/// <summary>
/// Shims DPS registration to fail.
/// </summary>
/// <remarks>Must be used within a ShimsContext.</remarks>
private static void ShimDpsToFail()
{
Microsoft.Azure.Devices.Provisioning.Client.Fakes.ShimProvisioningDeviceClient.AllInstances.RegisterAsync = (@this) => throw new Exception();
}
/// <summary>
/// Shims DPS registration to return a successful assignment and captures the registration call.
/// </summary>
/// <remarks>Must be used within a ShimsContext.</remarks>
/// <param name="hubToAssign">Hub to be returned in the assignment.</param>
/// <param name="onRegister">Action to execute on registration.</param>
private static void ShimDpsAndCaptureRegistration(string hubToAssign, Action onRegister)
{
Microsoft.Azure.Devices.Provisioning.Client.Fakes.ShimProvisioningDeviceClient.AllInstances.RegisterAsync = (@this) =>
{
onRegister();
return Task.FromResult(new DeviceRegistrationResult("some-id", DateTime.Now, hubToAssign, "some-id", ProvisioningRegistrationStatusType.Assigned, "", DateTime.Now, 0, "", ""));
};
}
/// <summary>
/// Shims DPS registration to return a successful assignment and captures the registration call payload.
/// </summary>
/// <remarks>Must be used within a ShimsContext.</remarks>
/// <param name="hubToAssign">Hub to be returned in the assignment.</param>
/// <param name="onRegister">Action to execute on registration.</param>
private static void ShimDpsAndCaptureRegistration(string hubToAssign, Action<ProvisioningRegistrationAdditionalData> onRegister)
{
Microsoft.Azure.Devices.Provisioning.Client.Fakes.ShimProvisioningDeviceClient.AllInstances.RegisterAsyncProvisioningRegistrationAdditionalData = (@this, payload) =>
{
onRegister(payload);
return Task.FromResult(new DeviceRegistrationResult("some-id", DateTime.Now, hubToAssign, "some-id", ProvisioningRegistrationStatusType.Assigned, "", DateTime.Now, 0, "", ""));
};
}
/// <summary>
/// Shims the DeviceClient to return success in all calls.
/// </summary>
/// <remarks>Must be used within a ShimsContext.</remarks>
private static void ShimDeviceClient()
{
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.CreateFromConnectionStringStringITransportSettingsArrayClientOptions = (string connStr, ITransportSettings[] settings, ClientOptions _) => ShimsContext.ExecuteWithoutShims(() => DeviceClient.CreateFromConnectionString(connStr, settings));
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.OpenAsync = (@this) => Task.CompletedTask;
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.CloseAsync = (@this) => Task.CompletedTask;
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.CompleteAsyncMessage = (@this, message) => Task.CompletedTask;
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.RejectAsyncMessage = (@this, message) => Task.CompletedTask;
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.AbandonAsyncMessage = (@this, message) => Task.CompletedTask;
}
/// <summary>
/// Shims the DeviceClient to return success in all calls and captures open call.
/// </summary>
/// <remarks>Must be used within a ShimsContext.</remarks>
/// <param name="onOpen">Delegate to be called when OpenAsync is called.</param>
private static void ShimDeviceClientAndCaptureOpen(Action onOpen)
{
ShimDeviceClient();
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.OpenAsync = (@this) =>
{
onOpen();
return Task.CompletedTask;
};
}
/// <summary>
/// Shims the DeviceClient to return success in all calls and captures close call.
/// </summary>
/// <remarks>Must be used within a ShimsContext.</remarks>
/// <param name="onClose">Delegate to be called when CloseAsync is called.</param>
private static void ShimDeviceClientAndCaptureClose(Action onClose)
{
ShimDeviceClient();
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.CloseAsync = (@this) =>
{
onClose();
return Task.CompletedTask;
};
}
/// <summary>
/// Shims the DeviceClient to emit a specific status when the status change handler is registered.
/// </summary>
/// <remarks>Must be used within a ShimsContext.</remarks>
/// <param name="status">status.</param>
/// <param name="reason">status reason.</param>
private static void ShimDeviceClientAndEmitStatus(ConnectionStatus status, ConnectionStatusChangeReason reason)
{
ShimDeviceClient();
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.SetConnectionStatusChangesHandlerConnectionStatusChangesHandler = (@this, handler) =>
{
if (handler != null)
{
handler(status, reason);
}
};
}
private static void ShimDeviceClientAndCaptureAllHandlers(Action<MethodCallback> onMethodHandlerCaptured, Action<ReceiveMessageCallback> onMessageHandlerCaptured, Action<DesiredPropertyUpdateCallback> onDesiredPropertyHandlerCaptured)
{
ShimDeviceClient();
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.SetMethodDefaultHandlerAsyncMethodCallbackObject = (@this, handler, context) =>
{
onMethodHandlerCaptured(handler);
return Task.CompletedTask;
};
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.SetReceiveMessageHandlerAsyncReceiveMessageCallbackObjectCancellationToken = (@this, handler, context, token) =>
{
onMessageHandlerCaptured(handler);
return Task.CompletedTask;
};
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.SetDesiredPropertyUpdateCallbackAsyncDesiredPropertyUpdateCallbackObject = (@this, handler, context) =>
{
onDesiredPropertyHandlerCaptured(handler);
return Task.CompletedTask;
};
}
/// <summary>
/// Shims the device client and capture the connection string used to create it.
/// </summary>
/// /// <remarks>Must be used within a ShimsContext.</remarks>
/// <param name="onCreate">Action to execute when connection string is captured.</param>
private static void ShimDeviceClientAndCaptureConnectionString(Action<string> onCreate)
{
ShimDeviceClient();
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.CreateFromConnectionStringStringITransportSettingsArrayClientOptions = (string connStr, ITransportSettings[] settings, ClientOptions _) => {
onCreate(connStr);
return ShimsContext.ExecuteWithoutShims(() => DeviceClient.CreateFromConnectionString(connStr, settings));
};
}
/// <summary>
/// Shims the device client to fail and capture the connection string used to create it.
/// </summary>
/// /// <remarks>Must be used within a ShimsContext.</remarks>
/// <param name="onCreate">Action to execute when connection string is captured.</param>
/// /// <param name="exception">Exception to throw.</param>
private static void ShimDeviceClientToFailAndCaptureConnectionString(Action<string> onCreate, Exception exception = null)
{
ShimDeviceClientToFail();
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.CreateFromConnectionStringStringITransportSettingsArrayClientOptions = (string connStr, ITransportSettings[] settings, ClientOptions _) => {
onCreate(connStr);
throw exception ?? new Exception();
};
}
private static void ShimDeviceClientCaptureCloseDisposeAndConnectionStatusHandler(Action onClose, Action onDispose, Action<ConnectionStatusChangesHandler> onConnectionStatusChangeHandlerSet)
{
ShimDeviceClient();
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.CloseAsync = (@this) =>
{
onClose();
return Task.CompletedTask;
};
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.Dispose = (@this) =>
{
onDispose();
};
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.SetConnectionStatusChangesHandlerConnectionStatusChangesHandler = (@this, handler) =>
{
onConnectionStatusChangeHandlerSet(handler);
};
}
private static void ShimDeviceClientAndCaptureSend(Action<Microsoft.Azure.Devices.Client.Message> onSend)
{
ShimDeviceClient();
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.SendEventAsyncMessageCancellationToken = (@this, message, _) =>
{
onSend(message);
return Task.CompletedTask;
};
}
private static void ShimDeviceClientAndCapturePropertyUpdate(Action<TwinCollection> onUpdate)
{
ShimDeviceClient();
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.UpdateReportedPropertiesAsyncTwinCollectionCancellationToken = (@this, patch, _) =>
{
onUpdate(patch);
return Task.CompletedTask;
};
}
private static void ShimDeviceClientAndCaptureDispose(Action onDispose)
{
ShimDeviceClient();
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.Dispose = (@this) => onDispose();
}
private static void ShimDeviceClientAndReturnTwin(Twin twin)
{
ShimDeviceClient();
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.GetTwinAsyncCancellationToken = (@this, _) =>
{
return Task.FromResult(twin);
};
}
/// <summary>
/// Shims the DeviceClient to fail in all calls.
/// </summary>
/// <remarks>Must be used within a ShimsContext.</remarks>
/// <param name="exception">Exception to throw.</param>
private static void ShimDeviceClientToFail(Exception exception = null)
{
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.CreateFromConnectionStringStringITransportSettingsArrayClientOptions = (string connStr, ITransportSettings[] settings, ClientOptions _) => throw exception ?? new Exception();
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.OpenAsync = (@this) => throw exception ?? new Exception();
Microsoft.Azure.Devices.Client.Fakes.ShimDeviceClient.AllInstances.CloseAsync = (@this) => throw exception ?? new Exception();
}
/// <summary>
/// Asserts that an async function throws.
/// </summary>
/// <param name="fn">The async function to await.</param>
private static async Task ExpectToThrow(Func<Task> fn, Func<Exception, bool> exceptionTest = null)
{
try
{
await fn();
Assert.Fail("Expected function to throw");
}
catch (Exception e)
{
if (exceptionTest != null && !exceptionTest(e))
{
Assert.Fail("Exception didn't match test");
}
}
}
}
}