iothub/device/src/Transport/AmqpIot/AmqpIotSendingLink.cs (189 lines of code) (raw):

// Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Amqp; using Microsoft.Azure.Amqp.Framing; using Microsoft.Azure.Devices.Client.Exceptions; using Microsoft.Azure.Devices.Client.Extensions; using Microsoft.Azure.Devices.Shared; using Newtonsoft.Json; namespace Microsoft.Azure.Devices.Client.Transport.AmqpIot { internal class AmqpIotSendingLink { public event EventHandler Closed; private readonly SendingAmqpLink _sendingAmqpLink; public AmqpIotSendingLink(SendingAmqpLink sendingAmqpLink) { _sendingAmqpLink = sendingAmqpLink; _sendingAmqpLink.Closed += SendingAmqpLinkClosed; } private void SendingAmqpLinkClosed(object sender, EventArgs e) { if (Logging.IsEnabled) Logging.Enter(this, nameof(SendingAmqpLinkClosed)); Closed?.Invoke(this, e); if (Logging.IsEnabled) Logging.Exit(this, nameof(SendingAmqpLinkClosed)); } internal Task CloseAsync(CancellationToken cancellationToken) { if (Logging.IsEnabled) Logging.Enter(this, nameof(CloseAsync)); return _sendingAmqpLink.CloseAsync(cancellationToken); } internal void SafeClose() { if (Logging.IsEnabled) Logging.Enter(this, nameof(SafeClose)); _sendingAmqpLink.SafeClose(); if (Logging.IsEnabled) Logging.Exit(this, nameof(SafeClose)); } internal bool IsClosing() { return _sendingAmqpLink.IsClosing(); } #region Telemetry handling internal async Task<AmqpIotOutcome> SendMessageAsync(Message message, CancellationToken cancellationToken) { if (Logging.IsEnabled) Logging.Enter(this, message, nameof(SendMessageAsync)); // After this message is sent, we will return the outcome that has no references to the message // So it can safely be disposed. using AmqpMessage amqpMessage = AmqpIotMessageConverter.MessageToAmqpMessage(message); Outcome outcome = await SendAmqpMessageAsync(amqpMessage, cancellationToken).ConfigureAwait(false); if (Logging.IsEnabled) Logging.Exit(this, message, nameof(SendMessageAsync)); return new AmqpIotOutcome(outcome); } internal async Task<AmqpIotOutcome> SendMessagesAsync(IEnumerable<Message> messages, CancellationToken cancellationToken) { if (Logging.IsEnabled) Logging.Enter(this, nameof(SendMessagesAsync)); cancellationToken.ThrowIfCancellationRequested(); // List to hold messages in AMQP friendly format var messageList = new List<Data>(messages.Count()); foreach (Message message in messages) { #pragma warning disable CA2000 // Dispose objects before losing scope using AmqpMessage amqpMessage = AmqpIotMessageConverter.MessageToAmqpMessage(message); #pragma warning restore CA2000 // Dispose objects before losing scope var data = new Data { Value = AmqpIotMessageConverter.ReadStream(amqpMessage.ToStream()), }; messageList.Add(data); } using var batchMessage = AmqpMessage.Create(messageList); batchMessage.MessageFormat = AmqpConstants.AmqpBatchedMessageFormat; Outcome outcome = await SendAmqpMessageAsync(batchMessage, cancellationToken).ConfigureAwait(false); var amqpIotOutcome = new AmqpIotOutcome(outcome); amqpIotOutcome.ThrowIfNotAccepted(); if (Logging.IsEnabled) Logging.Exit(this, nameof(SendMessagesAsync)); return amqpIotOutcome; } private async Task<Outcome> SendAmqpMessageAsync(AmqpMessage amqpMessage, CancellationToken cancellationToken) { if (Logging.IsEnabled) Logging.Enter(this, nameof(SendAmqpMessageAsync)); try { return await _sendingAmqpLink .SendMessageAsync( amqpMessage, new ArraySegment<byte>(Guid.NewGuid().ToByteArray()), AmqpConstants.NullBinary, cancellationToken) .ConfigureAwait(false); } catch (Exception e) when (!e.IsFatal()) { Exception ex = AmqpIotExceptionAdapter.ConvertToIotHubException(e, _sendingAmqpLink); if (ReferenceEquals(e, ex)) { throw; } else { if (ex is AmqpIotResourceException) { _sendingAmqpLink.SafeClose(); throw new IotHubCommunicationException(ex.Message, ex); } throw ex; } } finally { if (Logging.IsEnabled) Logging.Exit(this, nameof(SendAmqpMessageAsync)); } } #endregion Telemetry handling #region Method handling internal async Task<AmqpIotOutcome> SendMethodResponseAsync(MethodResponseInternal methodResponse, CancellationToken cancellationToken) { if (Logging.IsEnabled) Logging.Enter(this, methodResponse, nameof(SendMethodResponseAsync)); cancellationToken.ThrowIfCancellationRequested(); using AmqpMessage amqpMessage = AmqpIotMessageConverter.ConvertMethodResponseInternalToAmqpMessage(methodResponse); AmqpIotMessageConverter.PopulateAmqpMessageFromMethodResponse(amqpMessage, methodResponse); Outcome outcome = await SendAmqpMessageAsync(amqpMessage, cancellationToken).ConfigureAwait(false); if (Logging.IsEnabled) Logging.Exit(this, nameof(SendMethodResponseAsync)); return new AmqpIotOutcome(outcome); } #endregion Method handling #region Twin handling internal async Task<AmqpIotOutcome> SendTwinGetMessageAsync(string correlationId, CancellationToken cancellationToken) { if (Logging.IsEnabled) Logging.Enter(this, nameof(SendTwinGetMessageAsync)); using var amqpMessage = AmqpMessage.Create(); amqpMessage.Properties.CorrelationId = correlationId; amqpMessage.MessageAnnotations.Map["operation"] = "GET"; Outcome outcome = await SendAmqpMessageAsync(amqpMessage, cancellationToken).ConfigureAwait(false); if (Logging.IsEnabled) Logging.Exit(this, nameof(SendTwinGetMessageAsync)); return new AmqpIotOutcome(outcome); } internal async Task<AmqpIotOutcome> SendTwinPatchMessageAsync( string correlationId, TwinCollection reportedProperties, CancellationToken cancellationToken) { if (Logging.IsEnabled) Logging.Enter(this, nameof(SendTwinPatchMessageAsync)); string body = JsonConvert.SerializeObject(reportedProperties, JsonSerializerSettingsInitializer.GetJsonSerializerSettings()); var bodyStream = new MemoryStream(Encoding.UTF8.GetBytes(body)); using var amqpMessage = AmqpMessage.Create(bodyStream, true); amqpMessage.Properties.CorrelationId = correlationId; amqpMessage.MessageAnnotations.Map["operation"] = "PATCH"; amqpMessage.MessageAnnotations.Map["resource"] = "/properties/reported"; amqpMessage.MessageAnnotations.Map["version"] = null; Outcome outcome = await SendAmqpMessageAsync(amqpMessage, cancellationToken).ConfigureAwait(false); if (Logging.IsEnabled) Logging.Exit(this, nameof(SendTwinPatchMessageAsync)); return new AmqpIotOutcome(outcome); } internal async Task<AmqpIotOutcome> SubscribeToDesiredPropertiesAsync(string correlationId, CancellationToken cancellationToken) { if (Logging.IsEnabled) Logging.Enter(this, nameof(SubscribeToDesiredPropertiesAsync)); using var amqpMessage = AmqpMessage.Create(); amqpMessage.Properties.CorrelationId = correlationId; amqpMessage.MessageAnnotations.Map["operation"] = "PUT"; amqpMessage.MessageAnnotations.Map["resource"] = "/notifications/twin/properties/desired"; amqpMessage.MessageAnnotations.Map["version"] = null; Outcome outcome = await SendAmqpMessageAsync(amqpMessage, cancellationToken).ConfigureAwait(false); if (Logging.IsEnabled) Logging.Exit(this, nameof(SubscribeToDesiredPropertiesAsync)); return new AmqpIotOutcome(outcome); } #endregion Twin handling } }