in iothub/device/src/Transport/Mqtt/MqttTransportHandler.cs [593:656]
public async void OnMessageReceived(Message message)
{
if (Logging.IsEnabled)
Logging.Enter(this, message, nameof(OnMessageReceived));
// Added Try-Catch to avoid unknown thread exception
// after running for more than 24 hours
try
{
if ((State & TransportState.Open) == TransportState.Open)
{
string topic = message.MqttTopicName;
if (Logging.IsEnabled)
Logging.Info(this, $"Received a message on topic: {topic}", nameof(OnMessageReceived));
if (topic.StartsWith(TwinResponseTopicPrefix, StringComparison.OrdinalIgnoreCase))
{
_twinResponseEvent(message);
await CompleteIncomingMessageAsync(message).ConfigureAwait(false);
}
else if (topic.StartsWith(TwinPatchTopicPrefix, StringComparison.OrdinalIgnoreCase))
{
await HandleIncomingTwinPatchAsync(message).ConfigureAwait(false);
}
else if (topic.StartsWith(MethodPostTopicPrefix, StringComparison.OrdinalIgnoreCase))
{
await HandleIncomingMethodPostAsync(message).ConfigureAwait(false);
}
else if (topic.StartsWith(_receiveEventMessagePrefix, StringComparison.OrdinalIgnoreCase))
{
await HandleIncomingEventMessageAsync(message).ConfigureAwait(false);
}
else if (topic.StartsWith(_deviceboundMessagePrefix, StringComparison.OrdinalIgnoreCase))
{
_messageQueue.Enqueue(message);
if (_isDeviceReceiveMessageCallbackSet)
{
await HandleIncomingMessagesAsync().ConfigureAwait(false);
}
else
{
_receivingSemaphore.Release();
}
}
else
{
if (Logging.IsEnabled)
Logging.Error(this, $"Received MQTT message on an unrecognized topic, ignoring message. Topic: {topic}");
}
}
}
catch (Exception ex)
{
if (Logging.IsEnabled)
Logging.Error(this, $"Received an exception while processing an MQTT message: {ex}", nameof(OnMessageReceived));
OnError(ex);
}
finally
{
if (Logging.IsEnabled)
Logging.Exit(this, message, nameof(OnMessageReceived));
}
}