dotnet/src/Azure.Iot.Operations.Protocol/Telemetry/TelemetrySender.cs (184 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Runtime.Serialization; using Azure.Iot.Operations.Protocol.Models; using System.Diagnostics; namespace Azure.Iot.Operations.Protocol.Telemetry { public abstract class TelemetrySender<T> : IAsyncDisposable where T : class { private readonly ApplicationContext _applicationContext; private readonly IMqttPubSubClient _mqttClient; private readonly IPayloadSerializer _serializer; private bool _isDisposed; private bool _hasBeenValidated; /// <summary> /// The timeout value that every telemetry message sent by this sender will use if no timeout is specified. /// </summary> /// <remarks> /// This value sets the message expiry interval field on the underlying MQTT message. This means /// that, if the message is successfully delivered to the MQTT broker, the message will be discarded /// by the broker if the broker has not managed to start onward delivery to a matching subscriber within /// this timeout. /// /// If this value is equal to zero seconds, then the message will never expire at the broker. /// </remarks> private static readonly TimeSpan DefaultTelemetryTimeout = TimeSpan.FromSeconds(10); public string TopicPattern { get; init; } public string? TopicNamespace { get; set; } /// <summary> /// The topic token replacement map that this telemetry sender will use by default. Generally, this will include the token values /// for topic tokens such as "modelId" which should be the same for the duration of this command invoker's lifetime. /// </summary> /// <remarks> /// Tokens replacement values can also be specified per-telemetry by specifying the additionalTopicToken map in <see cref="SendTelemetryAsync(T, MqttQualityOfServiceLevel, TimeSpan?, CancellationToken)"/>. /// </remarks> public Dictionary<string, string> TopicTokenMap { get; protected set; } public TelemetrySender(ApplicationContext applicationContext, IMqttPubSubClient mqttClient, IPayloadSerializer serializer) { _applicationContext = applicationContext; _mqttClient = mqttClient; _serializer = serializer; _hasBeenValidated = false; TopicPattern = AttributeRetriever.GetAttribute<TelemetryTopicAttribute>(this)?.Topic ?? string.Empty; TopicTokenMap = new(); } /// <summary> /// Send telemetry with the default metadata. /// </summary> /// <param name="telemetry">The payload of the telemetry.</param> /// <param name="additionalTopicTokenMap"> /// The topic token replacement map to use in addition to <see cref="TopicTokenMap"/>.If this map /// contains any keys that <see cref="TopicTokenMap"/> also has, then values specified in this map will take precedence. /// </param> /// <param name="qos">The quality of service to send the telemetry with.</param> /// <param name="telemetryTimeout">How long the telemetry message will be available on the broker for a receiver to receive.</param> /// <param name="cancellationToken">Cancellation token.</param> public async Task SendTelemetryAsync(T telemetry, Dictionary<string, string>? additionalTopicTokenMap = null, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce, TimeSpan? telemetryTimeout = null, CancellationToken cancellationToken = default) { await SendTelemetryAsync(telemetry, new OutgoingTelemetryMetadata(), additionalTopicTokenMap, qos, telemetryTimeout, cancellationToken); } /// <summary> /// Send telemetry with custom metadata. /// </summary> /// <param name="telemetry">The payload of the telemetry.</param> /// <param name="metadata">The telemetry metadata.</param> /// <param name="additionalTopicTokenMap"> /// The topic token replacement map to use in addition to <see cref="TopicTokenMap"/>.If this map /// contains any keys that <see cref="TopicTokenMap"/> also has, then values specified in this map will take precedence. /// </param> /// <param name="qos">The quality of service to send the telemetry with.</param> /// <param name="telemetryTimeout">How long the telemetry message will be available on the broker for a receiver to receive.</param> /// <param name="cancellationToken">Cancellation token.</param> public async Task SendTelemetryAsync(T telemetry, OutgoingTelemetryMetadata metadata, Dictionary<string, string>? additionalTopicTokenMap = null, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce, TimeSpan? telemetryTimeout = null, CancellationToken cancellationToken = default) { ObjectDisposedException.ThrowIf(_isDisposed, this); ValidateAsNeeded(additionalTopicTokenMap); cancellationToken.ThrowIfCancellationRequested(); string? clientId = _mqttClient.ClientId; if (string.IsNullOrEmpty(clientId)) { throw new InvalidOperationException("No MQTT client Id configured. Must connect to MQTT broker before sending telemetry."); } TimeSpan verifiedMessageExpiryInterval = telemetryTimeout ?? DefaultTelemetryTimeout; if (verifiedMessageExpiryInterval <= TimeSpan.Zero) { throw AkriMqttException.GetConfigurationInvalidException("messageExpiryInterval", verifiedMessageExpiryInterval, "message expiry interval must have a positive value"); } // Rounding up to the nearest second verifiedMessageExpiryInterval = TimeSpan.FromSeconds(Math.Ceiling(verifiedMessageExpiryInterval.TotalSeconds)); if (verifiedMessageExpiryInterval.TotalSeconds > uint.MaxValue) { throw AkriMqttException.GetConfigurationInvalidException("messageExpiryInterval", verifiedMessageExpiryInterval, $"message expiry interval cannot be larger than {uint.MaxValue} seconds"); } StringBuilder telemTopic = new(); if (TopicNamespace != null) { if (!MqttTopicProcessor.IsValidReplacement(TopicNamespace)) { throw AkriMqttException.GetConfigurationInvalidException(nameof(TopicNamespace), TopicNamespace, "MQTT topic namespace is not valid"); } telemTopic.Append(TopicNamespace); telemTopic.Append('/'); } Dictionary<string, string> combinedTopicTokenMap = CombineTopicTokenMaps(TopicTokenMap, additionalTopicTokenMap); telemTopic.Append(MqttTopicProcessor.ResolveTopic(TopicPattern, combinedTopicTokenMap)); try { SerializedPayloadContext serializedPayloadContext = _serializer.ToBytes(telemetry); MqttApplicationMessage applicationMessage = new(telemTopic.ToString(), qos) { PayloadFormatIndicator = (MqttPayloadFormatIndicator)serializedPayloadContext.PayloadFormatIndicator, ContentType = serializedPayloadContext.ContentType, MessageExpiryInterval = (uint)verifiedMessageExpiryInterval.TotalSeconds, Payload = serializedPayloadContext.SerializedPayload, }; if (metadata?.CloudEvent is not null) { metadata.CloudEvent.Id ??= Guid.NewGuid().ToString(); metadata.CloudEvent.Time ??= DateTime.UtcNow; metadata.CloudEvent.Subject ??= telemTopic.ToString(); metadata.CloudEvent.DataContentType = serializedPayloadContext.ContentType; } // Update HLC and use as the timestamp. await _applicationContext.ApplicationHlc.UpdateNowAsync(cancellationToken: cancellationToken); metadata!.Timestamp = new HybridLogicalClock(_applicationContext.ApplicationHlc); if (metadata != null) { // The addition of the timestamp on to user properties happen below. applicationMessage.AddMetadata(metadata); } applicationMessage.AddUserProperty(AkriSystemProperties.ProtocolVersion, $"{TelemetryVersion.MajorProtocolVersion}.{TelemetryVersion.MinorProtocolVersion}"); applicationMessage.AddUserProperty(AkriSystemProperties.SourceId, clientId); MqttClientPublishResult pubAck = await _mqttClient.PublishAsync(applicationMessage, cancellationToken).ConfigureAwait(false); MqttClientPublishReasonCode pubReasonCode = pubAck.ReasonCode; if (pubReasonCode != MqttClientPublishReasonCode.Success) { throw new AkriMqttException($"Telemetry sending to the topic '{telemTopic}' failed due to an unsuccessful publishing with the error code {pubReasonCode}") { Kind = AkriMqttErrorKind.MqttError, IsShallow = false, IsRemote = false, }; } Trace.TraceInformation($"Telemetry sent successfully to the topic '{telemTopic}'"); } catch (SerializationException ex) { Trace.TraceError($"The message payload cannot be serialized due to error: {ex}"); throw new AkriMqttException("The message payload cannot be serialized.", ex) { Kind = AkriMqttErrorKind.PayloadInvalid, IsShallow = true, IsRemote = false, }; } catch (Exception ex) when (ex is not AkriMqttException) { Trace.TraceError($"Sending telemetry failed due to a MQTT communication error: {ex}"); throw new AkriMqttException($"Sending telemetry failed due to a MQTT communication error: {ex.Message}.", ex) { Kind = AkriMqttErrorKind.Timeout, IsShallow = false, IsRemote = false, }; } } private void ValidateAsNeeded(Dictionary<string, string>? additionalTopicTokenMap) { if (_hasBeenValidated) { return; } if (_mqttClient.ProtocolVersion != MqttProtocolVersion.V500) { throw AkriMqttException.GetConfigurationInvalidException( "MQTTClient.ProtocolVersion", _mqttClient.ProtocolVersion, "The provided MQTT client is not configured for MQTT version 5"); } Dictionary<string, string> combinedTopicTokenMap = CombineTopicTokenMaps(TopicTokenMap, additionalTopicTokenMap); PatternValidity patternValidity = MqttTopicProcessor.ValidateTopicPattern(TopicPattern, combinedTopicTokenMap, requireReplacement: true, out string errMsg, out string? errToken, out string? errReplacement); if (patternValidity != PatternValidity.Valid) { throw patternValidity switch { PatternValidity.MissingReplacement => AkriMqttException.GetArgumentInvalidException(null, errToken!, null, errMsg), PatternValidity.InvalidResidentReplacement => AkriMqttException.GetConfigurationInvalidException(errToken!, errReplacement!, errMsg), _ => AkriMqttException.GetConfigurationInvalidException(nameof(TopicPattern), TopicPattern, errMsg), }; } _hasBeenValidated = true; } public virtual async ValueTask DisposeAsync() { await DisposeAsyncCore(false); GC.SuppressFinalize(this); } public virtual async ValueTask DisposeAsync(bool disposing) { await DisposeAsyncCore(disposing); } private static Dictionary<string, string> CombineTopicTokenMaps(Dictionary<string, string> baseMap, Dictionary<string, string>? additionalMap) { Dictionary<string, string> combinedTopicTokenMap = new(baseMap); additionalMap ??= new(); foreach (string topicTokenKey in additionalMap.Keys) { combinedTopicTokenMap.TryAdd(topicTokenKey, additionalMap[topicTokenKey]); } return combinedTopicTokenMap; } protected virtual async ValueTask DisposeAsyncCore(bool disposing) { if (!_isDisposed) { if (disposing) { await _mqttClient.DisposeAsync(disposing); } _isDisposed = true; } } } }