in dotnet/src/Azure.Iot.Operations.Protocol/Telemetry/TelemetrySender.cs [89:198]
public async Task SendTelemetryAsync(T telemetry, OutgoingTelemetryMetadata metadata, Dictionary<string, string>? additionalTopicTokenMap = null, MqttQualityOfServiceLevel qos = MqttQualityOfServiceLevel.AtLeastOnce, TimeSpan? telemetryTimeout = null, CancellationToken cancellationToken = default)
{
ObjectDisposedException.ThrowIf(_isDisposed, this);
ValidateAsNeeded(additionalTopicTokenMap);
cancellationToken.ThrowIfCancellationRequested();
string? clientId = _mqttClient.ClientId;
if (string.IsNullOrEmpty(clientId))
{
throw new InvalidOperationException("No MQTT client Id configured. Must connect to MQTT broker before sending telemetry.");
}
TimeSpan verifiedMessageExpiryInterval = telemetryTimeout ?? DefaultTelemetryTimeout;
if (verifiedMessageExpiryInterval <= TimeSpan.Zero)
{
throw AkriMqttException.GetConfigurationInvalidException("messageExpiryInterval", verifiedMessageExpiryInterval, "message expiry interval must have a positive value");
}
// Rounding up to the nearest second
verifiedMessageExpiryInterval = TimeSpan.FromSeconds(Math.Ceiling(verifiedMessageExpiryInterval.TotalSeconds));
if (verifiedMessageExpiryInterval.TotalSeconds > uint.MaxValue)
{
throw AkriMqttException.GetConfigurationInvalidException("messageExpiryInterval", verifiedMessageExpiryInterval, $"message expiry interval cannot be larger than {uint.MaxValue} seconds");
}
StringBuilder telemTopic = new();
if (TopicNamespace != null)
{
if (!MqttTopicProcessor.IsValidReplacement(TopicNamespace))
{
throw AkriMqttException.GetConfigurationInvalidException(nameof(TopicNamespace), TopicNamespace, "MQTT topic namespace is not valid");
}
telemTopic.Append(TopicNamespace);
telemTopic.Append('/');
}
Dictionary<string, string> combinedTopicTokenMap = CombineTopicTokenMaps(TopicTokenMap, additionalTopicTokenMap);
telemTopic.Append(MqttTopicProcessor.ResolveTopic(TopicPattern, combinedTopicTokenMap));
try
{
SerializedPayloadContext serializedPayloadContext = _serializer.ToBytes(telemetry);
MqttApplicationMessage applicationMessage = new(telemTopic.ToString(), qos)
{
PayloadFormatIndicator = (MqttPayloadFormatIndicator)serializedPayloadContext.PayloadFormatIndicator,
ContentType = serializedPayloadContext.ContentType,
MessageExpiryInterval = (uint)verifiedMessageExpiryInterval.TotalSeconds,
Payload = serializedPayloadContext.SerializedPayload,
};
if (metadata?.CloudEvent is not null)
{
metadata.CloudEvent.Id ??= Guid.NewGuid().ToString();
metadata.CloudEvent.Time ??= DateTime.UtcNow;
metadata.CloudEvent.Subject ??= telemTopic.ToString();
metadata.CloudEvent.DataContentType = serializedPayloadContext.ContentType;
}
// Update HLC and use as the timestamp.
await _applicationContext.ApplicationHlc.UpdateNowAsync(cancellationToken: cancellationToken);
metadata!.Timestamp = new HybridLogicalClock(_applicationContext.ApplicationHlc);
if (metadata != null)
{
// The addition of the timestamp on to user properties happen below.
applicationMessage.AddMetadata(metadata);
}
applicationMessage.AddUserProperty(AkriSystemProperties.ProtocolVersion, $"{TelemetryVersion.MajorProtocolVersion}.{TelemetryVersion.MinorProtocolVersion}");
applicationMessage.AddUserProperty(AkriSystemProperties.SourceId, clientId);
MqttClientPublishResult pubAck = await _mqttClient.PublishAsync(applicationMessage, cancellationToken).ConfigureAwait(false);
MqttClientPublishReasonCode pubReasonCode = pubAck.ReasonCode;
if (pubReasonCode != MqttClientPublishReasonCode.Success)
{
throw new AkriMqttException($"Telemetry sending to the topic '{telemTopic}' failed due to an unsuccessful publishing with the error code {pubReasonCode}")
{
Kind = AkriMqttErrorKind.MqttError,
IsShallow = false,
IsRemote = false,
};
}
Trace.TraceInformation($"Telemetry sent successfully to the topic '{telemTopic}'");
}
catch (SerializationException ex)
{
Trace.TraceError($"The message payload cannot be serialized due to error: {ex}");
throw new AkriMqttException("The message payload cannot be serialized.", ex)
{
Kind = AkriMqttErrorKind.PayloadInvalid,
IsShallow = true,
IsRemote = false,
};
}
catch (Exception ex) when (ex is not AkriMqttException)
{
Trace.TraceError($"Sending telemetry failed due to a MQTT communication error: {ex}");
throw new AkriMqttException($"Sending telemetry failed due to a MQTT communication error: {ex.Message}.", ex)
{
Kind = AkriMqttErrorKind.Timeout,
IsShallow = false,
IsRemote = false,
};
}
}