in iothub/device/src/Transport/Mqtt/MqttIotHubAdapter.cs [164:224]
public override async Task WriteAsync(IChannelHandlerContext context, object data)
{
if (Logging.IsEnabled)
Logging.Enter(this, context.Name, data, nameof(WriteAsync));
try
{
if (IsInState(StateFlags.Closed))
{
if (Logging.IsEnabled)
Logging.Error(this, "When writing data to the MQTT transport layer, it had already been closed.", nameof(WriteAsync));
throw new IotHubCommunicationException("MQTT is disconnected.");
}
if (data is Message message)
{
await SendMessageAsync(context, message).ConfigureAwait(true);
return;
}
if (data is string packetIdString)
{
await AcknowledgeAsync(context, packetIdString).ConfigureAwait(true);
return;
}
if (data is DisconnectPacket)
{
await WriteMessageAsync(context, data, s_shutdownOnWriteErrorHandler).ConfigureAwait(true);
return;
}
if (data is SubscribePacket)
{
await SubscribeAsync(context, data as SubscribePacket).ConfigureAwait(true);
return;
}
if (data is UnsubscribePacket)
{
await UnSubscribeAsync(context, data as UnsubscribePacket).ConfigureAwait(true);
return;
}
throw new InvalidOperationException($"Unexpected data type: '{data.GetType().Name}'");
}
catch (Exception ex) when (!ex.IsFatal())
{
if (Logging.IsEnabled)
Logging.Error(this, $"Received a non-fatal exception while writing data to the MQTT transport layer; will shut down: {ex}", nameof(WriteAsync));
ShutdownOnErrorAsync(context, ex);
throw;
}
finally
{
if (Logging.IsEnabled)
Logging.Exit(this, context.Name, nameof(WriteAsync));
}
}