public async Task SendTelemetryAsync()

in dotnet/src/Azure.Iot.Operations.Protocol/Telemetry/TelemetrySender.cs [89:198]


        public async Task SendTelemetryAsync(T telemetry, OutgoingTelemetryMetadata metadata, Dictionary<string, string>? additionalTopicTokenMap = null, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce, TimeSpan? telemetryTimeout = null, CancellationToken cancellationToken = default)
        {
            ObjectDisposedException.ThrowIf(_isDisposed, this);
            ValidateAsNeeded(additionalTopicTokenMap);
            cancellationToken.ThrowIfCancellationRequested();

            string? clientId = _mqttClient.ClientId;
            if (string.IsNullOrEmpty(clientId))
            {
                throw new InvalidOperationException("No MQTT client Id configured. Must connect to MQTT broker before sending telemetry.");
            }

            TimeSpan verifiedMessageExpiryInterval = telemetryTimeout ?? DefaultTelemetryTimeout;

            if (verifiedMessageExpiryInterval <= TimeSpan.Zero)
            {
                throw AkriMqttException.GetConfigurationInvalidException("messageExpiryInterval", verifiedMessageExpiryInterval, "message expiry interval must have a positive value");
            }

            // Rounding up to the nearest second
            verifiedMessageExpiryInterval = TimeSpan.FromSeconds(Math.Ceiling(verifiedMessageExpiryInterval.TotalSeconds));

            if (verifiedMessageExpiryInterval.TotalSeconds > uint.MaxValue)
            {
                throw AkriMqttException.GetConfigurationInvalidException("messageExpiryInterval", verifiedMessageExpiryInterval, $"message expiry interval cannot be larger than {uint.MaxValue} seconds");
            }

            StringBuilder telemTopic = new();

            if (TopicNamespace != null)
            {
                if (!MqttTopicProcessor.IsValidReplacement(TopicNamespace))
                {
                    throw AkriMqttException.GetConfigurationInvalidException(nameof(TopicNamespace), TopicNamespace, "MQTT topic namespace is not valid");
                }

                telemTopic.Append(TopicNamespace);
                telemTopic.Append('/');
            }

            Dictionary<string, string> combinedTopicTokenMap = CombineTopicTokenMaps(TopicTokenMap, additionalTopicTokenMap);

            telemTopic.Append(MqttTopicProcessor.ResolveTopic(TopicPattern, combinedTopicTokenMap));

            try
            {
                SerializedPayloadContext serializedPayloadContext = _serializer.ToBytes(telemetry);
                MqttApplicationMessage applicationMessage = new(telemTopic.ToString(), qos)
                {
                    PayloadFormatIndicator = (MqttPayloadFormatIndicator)serializedPayloadContext.PayloadFormatIndicator,
                    ContentType = serializedPayloadContext.ContentType,
                    MessageExpiryInterval = (uint)verifiedMessageExpiryInterval.TotalSeconds,
                    Payload = serializedPayloadContext.SerializedPayload,
                };

                if (metadata?.CloudEvent is not null)
                {
                    metadata.CloudEvent.Id ??= Guid.NewGuid().ToString();
                    metadata.CloudEvent.Time ??= DateTime.UtcNow;
                    metadata.CloudEvent.Subject ??= telemTopic.ToString();
                    metadata.CloudEvent.DataContentType = serializedPayloadContext.ContentType;
                }

                // Update HLC and use as the timestamp.
                await _applicationContext.ApplicationHlc.UpdateNowAsync(cancellationToken: cancellationToken);
                metadata!.Timestamp = new HybridLogicalClock(_applicationContext.ApplicationHlc);

                if (metadata != null)
                {
                    // The addition of the timestamp on to user properties happen below.
                    applicationMessage.AddMetadata(metadata);
                }

                applicationMessage.AddUserProperty(AkriSystemProperties.ProtocolVersion, $"{TelemetryVersion.MajorProtocolVersion}.{TelemetryVersion.MinorProtocolVersion}");
                applicationMessage.AddUserProperty(AkriSystemProperties.SourceId, clientId);

                MqttClientPublishResult pubAck = await _mqttClient.PublishAsync(applicationMessage, cancellationToken).ConfigureAwait(false);
                MqttClientPublishReasonCode pubReasonCode = pubAck.ReasonCode;
                if (pubReasonCode != MqttClientPublishReasonCode.Success)
                {
                    throw new AkriMqttException($"Telemetry sending to the topic '{telemTopic}' failed due to an unsuccessful publishing with the error code {pubReasonCode}")
                    {
                        Kind = AkriMqttErrorKind.MqttError,
                        IsShallow = false,
                        IsRemote = false,
                    };
                }
                Trace.TraceInformation($"Telemetry sent successfully to the topic '{telemTopic}'");
            }
            catch (SerializationException ex)
            {
                Trace.TraceError($"The message payload cannot be serialized due to error: {ex}");
                throw new AkriMqttException("The message payload cannot be serialized.", ex)
                {
                    Kind = AkriMqttErrorKind.PayloadInvalid,
                    IsShallow = true,
                    IsRemote = false,
                };
            }
            catch (Exception ex) when (ex is not AkriMqttException)
            {
                Trace.TraceError($"Sending telemetry failed due to a MQTT communication error: {ex}");
                throw new AkriMqttException($"Sending telemetry failed due to a MQTT communication error: {ex.Message}.", ex)
                {
                    Kind = AkriMqttErrorKind.Timeout,
                    IsShallow = false,
                    IsRemote = false,
                };
            }
        }