in DeviceBridge/Services/ConnectionManager.cs [165:314]
public async Task AssertDeviceConnectionOpenAsync(string deviceId, bool temporary = false, CancellationToken? cancellationToken = null)
{
_logger.Info("Attempting to initialize {connectionType} connection for device {deviceId}", temporary ? "Temporary" : "Permanent", deviceId);
_lastConnectionAttempt.AddOrUpdate(deviceId, DateTime.Now, (key, oldValue) => DateTime.Now);
var mutex = _clientSemaphores.GetOrAdd(deviceId, new SemaphoreSlim(1, 1));
await mutex.WaitAsync();
try
{
_logger.Info("Acquired connection lock for device {deviceId}", deviceId);
if (temporary)
{
// Always renew the connection duration, as the user wants to assert that this connection will live for a few minutes (even if a previous temporary connection exists).
// We use a random factor to spread out when temporary connections expire.
var shouldLiveUntil = DateTimeOffset.UtcNow.ToUnixTimeSeconds() + new Random().Next(TemporaryConnectionMinDurationSeconds, TemporaryConnectionMaxDurationSeconds);
_hasTemporaryConnectionUntil.AddOrUpdate(deviceId, shouldLiveUntil, (key, oldValue) => shouldLiveUntil);
_logger.Info("Temporary connection for device {deviceId} set to live at least until {shouldLiveUntil}", deviceId, DateTimeOffset.FromUnixTimeSeconds(shouldLiveUntil).UtcDateTime);
}
else
{
_hasPermanentConnection.AddOrUpdate(deviceId, true, (key, oldValue) => true);
}
// Dispose the current client if it is in a permanent failure state.
if (_clientStatuses.TryGetValue(deviceId, out (ConnectionStatus status, ConnectionStatusChangeReason reason) currentStatus))
{
// Permanent failure state, taken from https://github.com/Azure-Samples/azure-iot-samples-csharp/tree/master/iot-hub/Samples/device/DeviceReconnectionSample
bool isFailed = currentStatus.status == ConnectionStatus.Disconnected;
if (isFailed && _clients.TryRemove(deviceId, out DeviceClient existingClient))
{
_logger.Info("Disposing existing failed client for device {deviceId}", deviceId);
await existingClient.CloseAsync();
existingClient.Dispose();
existingClient.SetConnectionStatusChangesHandler(null);
}
}
if (_clients.TryGetValue(deviceId, out DeviceClient _))
{
_logger.Info("Connection for device {deviceId} already exists", deviceId);
return;
}
var deviceKey = ComputeDerivedSymmetricKey(Convert.FromBase64String(_sasKey), deviceId);
// If we already know this device's hub, attempt to connect to it first.
if (_deviceHubs.TryGetValue(deviceId, out string knownDeviceHub))
{
try
{
var client = await BuildAndOpenClient(_logger, knownDeviceHub, deviceKey, cancellationToken);
_clients.AddOrUpdate(deviceId, client, (key, oldValue) => client);
return;
}
catch (Exception e)
{
_logger.Error(e, "Failed to connect device {deviceId} to it's old hub ({knownDeviceHub}). Will try DPS registration again.", deviceId, knownDeviceHub);
}
}
// If connecting to the cached Hub failed, try DPS registration.
{
var deviceHub = await DpsRegisterInternalAsync(_logger, deviceId, deviceKey, null, cancellationToken);
_deviceHubs.AddOrUpdate(deviceId, deviceHub, (key, oldValue) => deviceHub);
try
{
await _storageProvider.AddOrUpdateHubCacheEntry(_logger, deviceId, deviceHub);
}
catch (Exception e)
{
// Storing the hub is a best-effort operation.
_logger.Error(e, "Failed to update Hub cache for device {deviceId}", deviceId);
}
var client = await BuildAndOpenClient(_logger, deviceHub, deviceKey, cancellationToken);
_clients.AddOrUpdate(deviceId, client, (key, oldValue) => client);
}
}
finally
{
mutex.Release();
}
async Task<DeviceClient> BuildAndOpenClient(Logger logger, string candidateHub, string deviceKey, CancellationToken? cancellationToken = null)
{
_logger.Info("Attempting to connect device {deviceId} to hub {candidateHub}", deviceId, candidateHub);
DeviceClient client = null;
try
{
var settings = new ITransportSettings[]
{
new AmqpTransportSettings(TransportType.Amqp_Tcp_Only)
{
AmqpConnectionPoolSettings = new AmqpConnectionPoolSettings()
{
Pooling = true,
MaxPoolSize = _maxPoolSize,
},
},
};
var connString = GetDeviceConnectionString(deviceId, candidateHub, deviceKey);
client = DeviceClient.CreateFromConnectionString(connString, settings);
client.SetConnectionStatusChangesHandler(BuildConnectionStatusChangeHandler(deviceId));
client.SetRetryPolicy(new ExponentialBackoff(ClientRetryCount, TimeSpan.FromMilliseconds(ClientRetryMinBackoffMs), TimeSpan.FromSeconds(ClientRetryMaxBackoffSec), TimeSpan.FromMilliseconds(ClientRetryDeltaBackoffMs)));
// If a desired property callback exists, register it.
if (_desiredPropertyUpdateCallbacks.TryGetValue(deviceId, out var desiredPropertyUpdateCallback))
{
await client.SetDesiredPropertyUpdateCallbackAsync(desiredPropertyUpdateCallback.callback, null);
}
// If a method callback exists, register it.
if (_methodCallbacks.TryGetValue(deviceId, out var methodCallback))
{
await client.SetMethodDefaultHandlerAsync(methodCallback.callback, null);
}
// If a C2DMessage callback exists, register it.
if (_messageCallbacks.TryGetValue(deviceId, out var messageCallback))
{
await client.SetReceiveMessageHandlerAsync(messageCallback.callback, null);
}
if (cancellationToken.HasValue)
{
await client.OpenAsync(cancellationToken.Value);
}
else
{
await client.OpenAsync();
}
_logger.Info("Device {deviceId} connected to hub {candidateHub}", deviceId, candidateHub);
return client;
}
catch (Exception e)
{
// Dispose of the failed client to make sure it doesn't retry internally.
client?.Dispose();
client?.SetConnectionStatusChangesHandler(null);
throw e;
}
}
}