in iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs [705:758]
private async void ProcessMessageAsync(IChannelHandlerContext context, Packet packet)
{
if (Logging.IsEnabled)
Logging.Enter(this, context.Name, packet.PacketType, nameof(ProcessMessageAsync));
try
{
switch (packet.PacketType)
{
case PacketType.CONNACK:
await ProcessConnectAckAsync(context, (ConnAckPacket)packet).ConfigureAwait(true);
break;
case PacketType.SUBACK:
ProcessSubAck(context, packet as SubAckPacket);
break;
case PacketType.PUBLISH:
ProcessPublish(context, (PublishPacket)packet);
break;
case PacketType.PUBACK:
await _serviceBoundTwoWayProcessor.CompleteWorkAsync(context, ((PubAckPacket)packet).PacketId).ConfigureAwait(true);
break;
case PacketType.PINGRESP:
ProcessPingResp(context, (PingRespPacket)packet);
break;
case PacketType.UNSUBACK:
ProcessUnsubAck(context, packet as UnsubAckPacket);
break;
default:
if (Logging.IsEnabled)
Logging.Error(context, $"Received an unexpected packet type {packet.PacketType}, will shut down.", nameof(ProcessMessageAsync));
ShutdownOnErrorAsync(context, new InvalidOperationException($"Unexpected packet type {packet.PacketType}"));
break;
}
}
catch (Exception ex) when (!ex.IsFatal())
{
if (Logging.IsEnabled)
Logging.Error(context, $"Received a non-fatal exception while processing a received packet of type {packet.PacketType}, will shut down: {ex}", nameof(ProcessMessageAsync));
ShutdownOnErrorAsync(context, ex);
}
finally
{
if (Logging.IsEnabled)
Logging.Exit(this, context.Name, packet.PacketType, nameof(ProcessMessageAsync));
}
}