in DeviceBridge/Services/ConnectionManager.cs [703:766]
public async Task SetMessageCallbackAsync(string deviceId, string id, Func<Message, Task<ReceiveMessageCallbackStatus>> callback)
{
_logger.Info("Attempting to set C2DMessage handler for device {deviceId}", deviceId);
if (callback == null)
{
throw new ArgumentNullException(nameof(callback));
}
// We need to synchronize this with client creation/close so a race condition doesn't cause us to miss the
// callback registration on a client that is being currently created.
var mutex = _clientSemaphores.GetOrAdd(deviceId, new SemaphoreSlim(1, 1));
await mutex.WaitAsync();
try
{
_logger.Info("Acquired connection lock for device {deviceId}", deviceId);
async Task OnC2DMessageReceived(Message receivedMessage, object userContext)
{
if (!_clients.TryGetValue(deviceId, out DeviceClient tmpClient))
{
_logger.Info("Unable to find client for device {deviceId}, message will not be completed, rejected or abandoned.", deviceId);
return;
}
try
{
var status = await callback(receivedMessage);
if (status == ReceiveMessageCallbackStatus.Accept)
{
await tmpClient.CompleteAsync(receivedMessage);
}
else if (status == ReceiveMessageCallbackStatus.Abandon)
{
await tmpClient.AbandonAsync(receivedMessage);
}
else
{
await tmpClient.RejectAsync(receivedMessage);
}
}
catch
{
await tmpClient.AbandonAsync(receivedMessage);
}
}
_messageCallbacks.AddOrUpdate(deviceId, (id, OnC2DMessageReceived), (key, oldValue) => (id, OnC2DMessageReceived));
// If a client currently exists, register the callback
if (!_clients.TryGetValue(deviceId, out DeviceClient client))
{
_logger.Info("Connection for device {deviceId} not found while trying to set C2DMessage callback. Callback will be registered whenever a new client is created", deviceId);
return;
}
await client.SetReceiveMessageHandlerAsync(OnC2DMessageReceived, client);
}
finally
{
mutex.Release();
}
}