iothub/service/src/Feedback/AmqpFeedbackReceiver.cs (157 lines of code) (raw):

// Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Amqp; using Microsoft.Azure.Devices.Common; using Microsoft.Azure.Devices.Common.Extensions; using Microsoft.Azure.Devices.Shared; namespace Microsoft.Azure.Devices { internal sealed class AmqpFeedbackReceiver : FeedbackReceiver<FeedbackBatch>, IDisposable { private readonly FaultTolerantAmqpObject<ReceivingAmqpLink> _faultTolerantReceivingLink; private readonly string _receivingPath; public AmqpFeedbackReceiver(IotHubConnection iotHubConnection) { Connection = iotHubConnection; OpenTimeout = IotHubConnection.DefaultOpenTimeout; OperationTimeout = IotHubConnection.DefaultOperationTimeout; _receivingPath = AmqpClientHelper.GetReceivingPath(EndpointKind.Feedback); _faultTolerantReceivingLink = new FaultTolerantAmqpObject<ReceivingAmqpLink>(CreateReceivingLinkAsync, Connection.CloseLink); } public TimeSpan OpenTimeout { get; private set; } public TimeSpan OperationTimeout { get; private set; } public IotHubConnection Connection { get; private set; } public Task OpenAsync() { Logging.Enter(this, nameof(OpenAsync)); try { return _faultTolerantReceivingLink.GetReceivingLinkAsync(); } finally { Logging.Exit(this, nameof(OpenAsync)); } } public Task CloseAsync() { Logging.Enter(this, nameof(CloseAsync)); try { return _faultTolerantReceivingLink.CloseAsync(); } finally { Logging.Exit(this, nameof(CloseAsync)); } } [Obsolete("Use ReceiveAsync(CancellationToken cancellationToken).")] public override Task<FeedbackBatch> ReceiveAsync() { return ReceiveAsync(OperationTimeout); } [Obsolete("Use ReceiveAsync(CancellationToken cancellationToken).")] public override async Task<FeedbackBatch> ReceiveAsync(TimeSpan timeout) { using var cts = new CancellationTokenSource(timeout); return await ReceiveAsync(cts.Token).ConfigureAwait(false); } public override async Task<FeedbackBatch> ReceiveAsync(CancellationToken cancellationToken) { Logging.Enter(this, nameof(ReceiveAsync)); try { cancellationToken.ThrowIfCancellationRequested(); ReceivingAmqpLink receivingLink = await _faultTolerantReceivingLink.GetReceivingLinkAsync().ConfigureAwait(false); AmqpMessage amqpMessage = await receivingLink.ReceiveMessageAsync(cancellationToken).ConfigureAwait(false); Logging.Info(this, $"Message received is [{amqpMessage}]", nameof(ReceiveAsync)); if (amqpMessage != null) { using (amqpMessage) { AmqpClientHelper.ValidateContentType(amqpMessage, CommonConstants.BatchedFeedbackContentType); IEnumerable<FeedbackRecord> records = await AmqpClientHelper .GetObjectFromAmqpMessageAsync<IEnumerable<FeedbackRecord>>(amqpMessage).ConfigureAwait(false); return new FeedbackBatch { EnqueuedTime = (DateTime)amqpMessage.MessageAnnotations.Map[MessageSystemPropertyNames.EnqueuedTime], LockToken = new Guid(amqpMessage.DeliveryTag.Array).ToString(), Records = records, UserId = Encoding.UTF8.GetString(amqpMessage.Properties.UserId.Array, amqpMessage.Properties.UserId.Offset, amqpMessage.Properties.UserId.Count) }; } } return null; } catch (Exception exception) { Logging.Error(this, exception, nameof(ReceiveAsync)); if (exception.IsFatal()) { throw; } throw AmqpClientHelper.ToIotHubClientContract(exception); } finally { Logging.Exit(this, nameof(ReceiveAsync)); } } private Task<ReceivingAmqpLink> CreateReceivingLinkAsync(TimeSpan timeout) { Logging.Enter(this, timeout, nameof(CreateReceivingLinkAsync)); try { return Connection.CreateReceivingLinkAsync(_receivingPath, timeout, 0); } finally { Logging.Exit(this, timeout, nameof(CreateReceivingLinkAsync)); } } [Obsolete("Use CompleteAsync(FeeddbackBatch feedback, CancellationToken cancellationToken).")] public override Task CompleteAsync(FeedbackBatch feedback) { return AmqpClientHelper.DisposeMessageAsync( _faultTolerantReceivingLink, feedback.LockToken, AmqpConstants.AcceptedOutcome, false); // Feedback messages are sent by the service one at a time, so batching the acks is pointless } public override Task CompleteAsync(FeedbackBatch feedback, CancellationToken cancellationToken) { return AmqpClientHelper.DisposeMessageAsync( _faultTolerantReceivingLink, feedback.LockToken, AmqpConstants.AcceptedOutcome, false, // Feedback messages are sent by the service one at a time, so batching the acks is pointless cancellationToken); } [Obsolete("Use AbandonAsync(FeedbackBatch feedback, CancellationToken cancellationToken).")] public override Task AbandonAsync(FeedbackBatch feedback) { return AmqpClientHelper.DisposeMessageAsync( _faultTolerantReceivingLink, feedback.LockToken, AmqpConstants.ReleasedOutcome, false); // Feedback messages are sent by the service one at a time, so batching the acks is pointless } public override Task AbandonAsync(FeedbackBatch feedback, CancellationToken cancellationToken) { return AmqpClientHelper.DisposeMessageAsync( _faultTolerantReceivingLink, feedback.LockToken, AmqpConstants.ReleasedOutcome, false, // Feedback messages are sent by the service one at a time, so batching the acks is pointless cancellationToken); } /// <inheritdoc/> public void Dispose() { _faultTolerantReceivingLink.Dispose(); } } }