in dotnet/src/Azure.Iot.Operations.Protocol/Telemetry/TelemetryReceiver.cs [72:143]
private async Task MessageReceivedCallbackAsync(MqttApplicationMessageReceivedEventArgs args)
{
Trace.TraceInformation($"Telemetry received from {args.ApplicationMessage.Topic}");
string telemTopicFilter = GetTelemetryTopic();
if (MqttTopicProcessor.DoesTopicMatchFilter(args.ApplicationMessage.Topic, telemTopicFilter))
{
string? requestProtocolVersion = args.ApplicationMessage.UserProperties?.FirstOrDefault(p => p.Name == AkriSystemProperties.ProtocolVersion)?.Value;
if (!ProtocolVersion.TryParseProtocolVersion(requestProtocolVersion, out ProtocolVersion? protocolVersion))
{
Trace.TraceError($"Telemetry with CorrelationId {args.ApplicationMessage.CorrelationData} provided a malformed protocol version {requestProtocolVersion}. The telemetry will be ignored by this receiver.");
return;
}
if (!_supportedMajorProtocolVersions.Contains(protocolVersion!.MajorVersion))
{
Trace.TraceError($"Telemetry with CorrelationId {args.ApplicationMessage.CorrelationData} requested an unsupported protocol version {requestProtocolVersion}. This telemetry reciever supports versions {ProtocolVersion.ToString(_supportedMajorProtocolVersions)}. The telemetry will be ignored by this receiver.");
return;
}
args.AutoAcknowledge = false;
DateTime messageReceivedTime = WallClock.UtcNow;
TimeSpan telemetryTimeout = args.ApplicationMessage.MessageExpiryInterval != default ? TimeSpan.FromSeconds(args.ApplicationMessage.MessageExpiryInterval) : DefaultTelemetryTimeout;
DateTime telemetryExpirationTime = messageReceivedTime + telemetryTimeout;
string sourceId = args.ApplicationMessage.UserProperties?.FirstOrDefault(p => p.Name == AkriSystemProperties.SourceId)?.Value ?? string.Empty;
if (OnTelemetryReceived == null)
{
await GetDispatcher()(null, async () => { await args.AcknowledgeAsync(CancellationToken.None).ConfigureAwait(false); }).ConfigureAwait(false);
return;
}
try
{
T serializedPayload = _serializer.FromBytes<T>(args.ApplicationMessage.Payload, args.ApplicationMessage.ContentType, args.ApplicationMessage.PayloadFormatIndicator);
IncomingTelemetryMetadata metadata = new(args.ApplicationMessage, args.PacketIdentifier, TopicPattern);
if (metadata.Timestamp != null)
{
// Update application HLC against received TS
await _applicationContext.ApplicationHlc.UpdateWithOtherAsync(metadata.Timestamp);
}
else
{
Trace.TraceInformation($"No timestamp present in telemetry received metadata.");
}
async Task TelemFunc()
{
try
{
await OnTelemetryReceived(sourceId, serializedPayload, metadata);
}
catch (Exception innerEx)
{
Trace.TraceError($"Exception thrown while executing telemetry received callback: {innerEx.Message}");
}
}
await GetDispatcher()(TelemFunc, async () => { await args.AcknowledgeAsync(CancellationToken.None).ConfigureAwait(false); }).ConfigureAwait(false);
}
catch (Exception outerEx)
{
await GetDispatcher()(null, async () => { await args.AcknowledgeAsync(CancellationToken.None).ConfigureAwait(false); }).ConfigureAwait(false);
Trace.TraceError($"Exception thrown while deserializing payload, callback skipped: {outerEx.Message}");
}
}
}