src/SendingAmqpLink.cs (314 lines of code) (raw):

// Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. namespace Microsoft.Azure.Amqp { using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Amqp.Framing; using Microsoft.Azure.Amqp.Transaction; /// <summary> /// An AMQP link for sending messages. /// </summary> public sealed class SendingAmqpLink : AmqpLink, IWorkDelegate<AmqpMessage> { static readonly TimeSpan MinRequestCreditWindow = TimeSpan.FromSeconds(10); readonly SerializedWorker<AmqpMessage> pendingDeliveries; // need link credit readonly WorkCollection<ArraySegment<byte>, SendAsyncResult, Outcome> inflightSends; Action<Delivery> dispositionListener; DateTime lastFlowRequestTime; ICollection<Delivery> deliveriesToBeResentUponRecovery; /// <summary> /// Initializes the object. /// </summary> /// <param name="settings">The link settings.</param> public SendingAmqpLink(AmqpLinkSettings settings) : this(null, settings) { } /// <summary> /// Initializes the object. /// </summary> /// <param name="session">The session where the link is created.</param> /// <param name="settings">The link settings.</param> public SendingAmqpLink(AmqpSession session, AmqpLinkSettings settings) : base("sender", session, settings) { // TODO: Add capability negotiation logic for BatchedMessageFormat to this.Settings this.pendingDeliveries = new SerializedWorker<AmqpMessage>(this); this.inflightSends = new WorkCollection<ArraySegment<byte>, SendAsyncResult, Outcome>(ByteArrayComparer.Instance); this.lastFlowRequestTime = DateTime.UtcNow; } /// <summary> /// Registers a disposition listener to handler delivery state changes. /// </summary> /// <param name="dispositionListener"></param> public void RegisterDispositionListener(Action<Delivery> dispositionListener) { if (Interlocked.Exchange(ref this.dispositionListener, dispositionListener) != null) { throw new InvalidOperationException(CommonResources.DispositionListenerAlreadyRegistered); } } /// <summary> /// Sends a message and does not wait for the disposition. /// </summary> /// <param name="message">The message to send.</param> /// <param name="deliveryTag">The delivery tag.</param> /// <param name="txnId">The transaction id.</param> public void SendMessageNoWait(AmqpMessage message, ArraySegment<byte> deliveryTag, ArraySegment<byte> txnId) { this.SendMessageInternal(message, deliveryTag, txnId); } /// <summary> /// Starts an operation to send a message. /// </summary> /// <param name="message">The message to send.</param> /// <returns>A task that completes with the delivery outcome.</returns> public Task<Outcome> SendMessageAsync(AmqpMessage message) { return this.SendMessageAsync(message, CreateTag(), AmqpConstants.NullBinary, CancellationToken.None); } /// <summary> /// Starts an operation to send a message. /// </summary> /// <param name="message">The message to send.</param> /// <param name="deliveryTag">The delivery tag.</param> /// <param name="txnId">The transaction id.</param> /// <param name="timeout">The operation timeout.</param> /// <returns>A task that completes with the delivery outcome.</returns> public Task<Outcome> SendMessageAsync(AmqpMessage message, ArraySegment<byte> deliveryTag, ArraySegment<byte> txnId, TimeSpan timeout) { return Task.Factory.FromAsync( static (p, t, k, c, s) => ((SendingAmqpLink)s).BeginSendMessage(p.Message, p.DeliveryTag, p.TxnId, t, k, c, s), static r => ((SendingAmqpLink)r.AsyncState).EndSendMessage(r), new SendMessageParam(message, deliveryTag, txnId), timeout, CancellationToken.None, this); } /// <summary> /// Starts an operation to send a message. /// </summary> /// <param name="message">The message to send.</param> /// <param name="deliveryTag">The delivery tag.</param> /// <param name="txnId">The transaction id.</param> /// <param name="cancellationToken">A cancellation token that can be used to signal the asynchronous operation should be canceled.</param> /// <returns></returns> public Task<Outcome> SendMessageAsync(AmqpMessage message, ArraySegment<byte> deliveryTag, ArraySegment<byte> txnId, CancellationToken cancellationToken) { return Task.Factory.FromAsync( static (p, t, k, c, s) => ((SendingAmqpLink)s).BeginSendMessage(p.Message, p.DeliveryTag, p.TxnId, t, k, c, s), static r => ((SendingAmqpLink)r.AsyncState).EndSendMessage(r), new SendMessageParam(message, deliveryTag, txnId), this.OperationTimeout, cancellationToken, this); } /// <summary> /// Begins an operation to send a message. /// </summary> /// <param name="message">The message to send.</param> /// <param name="deliveryTag">The delivery tag.</param> /// <param name="txnId">The transaction id.</param> /// <param name="timeout">The operation timeout.</param> /// <param name="callback">The callback to invoke when the operation completes.</param> /// <param name="state">The object associated with the operation.</param> /// <returns>An <see cref="IAsyncResult"/>.</returns> public IAsyncResult BeginSendMessage(AmqpMessage message, ArraySegment<byte> deliveryTag, ArraySegment<byte> txnId, TimeSpan timeout, AsyncCallback callback, object state) { return this.BeginSendMessage(message, deliveryTag, txnId, timeout, CancellationToken.None, callback, state); } /// <summary> /// Ends the asynchronous send operation. /// </summary> /// <param name="result">The <see cref="IAsyncResult"/> returned by the begin method.</param> /// <returns>The delivery outcome.</returns> public Outcome EndSendMessage(IAsyncResult result) { return SendAsyncResult.End(result); } /// <summary> /// This method is not valid for a sending link. /// </summary> /// <param name="transfer"></param> /// <param name="delivery"></param> /// <returns></returns> protected override bool CreateDelivery(Transfer transfer, out Delivery delivery) { throw new NotImplementedException(); } /// <summary> /// Gets the number of deliveries that wait for link credits. /// </summary> public override uint Available { get { return (uint)this.pendingDeliveries.Count; } } /// <summary> /// Handles the state change of a delivery. /// </summary> /// <param name="delivery">The delivery whose state changed.</param> protected override void OnDisposeDeliveryInternal(Delivery delivery) { if (this.dispositionListener != null) { this.dispositionListener(delivery); } else { DeliveryState deliveryState = delivery.State; if (deliveryState.DescriptorCode == Received.Code) { return; } TransactionalState txnState = deliveryState as TransactionalState; if (txnState != null) { deliveryState = txnState.Outcome; } this.inflightSends.CompleteWork(delivery.DeliveryTag, false, (Outcome)deliveryState); } // Delivery is now settled with its final outcome and will not need to resumed. // Dispose and release buffers associated with the delivery. delivery.Dispose(); } /// <summary> /// This method is not valid for a sending link. /// </summary> /// <param name="delivery"></param> /// <param name="transfer"></param> /// <param name="frame"></param> protected override void OnProcessTransfer(Delivery delivery, Transfer transfer, Frame frame) { throw new AmqpException(AmqpErrorCode.NotAllowed, null); } /// <summary> /// Called when link credits are available. /// </summary> /// <param name="session">The session window.</param> /// <param name="link">The available link credits.</param> /// <param name="drain">true if the link is in drain mode, otherwise false.</param> /// <param name="txnId">The transaction id. It is ignored.</param> protected override void OnCreditAvailable(int session, uint link, bool drain, ArraySegment<byte> txnId) { if (this.LinkCredit > 0) { this.pendingDeliveries.ContinueWork(); } } /// <summary> /// Closes the link. /// </summary> /// <returns>true if the operation is completed, false otherwise.</returns> protected override bool CloseInternal() { this.AbortDeliveries(); return base.CloseInternal(); } /// <summary> /// Aborts the link. /// </summary> protected override void AbortInternal() { this.AbortDeliveries(); base.AbortInternal(); } /// <summary> /// Process and consolidate the unsettled deliveries sent with the remote Attach frame, by checking against the unsettled deliveries for this link terminus. /// </summary> /// <param name="remoteAttach">The incoming Attach from remote which contains the remote's unsettled delivery states.</param> protected override void ProcessUnsettledDeliveries(Attach remoteAttach) { if (this.Session.Connection.TerminusStore.TryGetLinkTerminusAsync(this.LinkIdentifier, out AmqpLinkTerminus linkTerminus).Result) { this.deliveriesToBeResentUponRecovery = Task.Run(() => linkTerminus.NegotiateUnsettledDeliveriesAsync(remoteAttach)).Result.Values; this.Opened += ResendDeliveriesOnOpen; } } static ArraySegment<byte> CreateTag() { return new ArraySegment<byte>(Guid.NewGuid().ToByteArray()); } static void OnRequestCredit(object state) { try { SendingAmqpLink thisPtr = (SendingAmqpLink)state; if (thisPtr.State == AmqpObjectState.OpenSent || thisPtr.State == AmqpObjectState.Opened) { thisPtr.SendFlow(true); } } catch (Exception exception) when (!Fx.IsFatal(exception)) { } } internal IAsyncResult BeginSendMessage(AmqpMessage message, ArraySegment<byte> deliveryTag, ArraySegment<byte> txnId, TimeSpan timeout, CancellationToken cancellationToken, AsyncCallback callback, object state) { this.ThrowIfClosed(); if (this.dispositionListener != null) { throw new InvalidOperationException(CommonResources.DispositionListenerSetNotSupported); } message.ThrowIfDisposed(); if (message.Link != null) { throw new InvalidOperationException(AmqpResources.AmqpCannotResendMessage); } if (message.Serialize(false) == 0) { throw new InvalidOperationException(AmqpResources.AmqpEmptyMessageNotAllowed); } return new SendAsyncResult(this, message, deliveryTag, txnId, timeout, cancellationToken, callback, state); } void AbortDeliveries() { if (this.pendingDeliveries != null) { this.pendingDeliveries.Abort(); } if (this.inflightSends != null) { this.inflightSends.Abort(); } } void SendMessageInternal(AmqpMessage message, ArraySegment<byte> deliveryTag, ArraySegment<byte> txnId) { message.DeliveryTag = deliveryTag; message.Settled = this.Settings.SettleType == SettleMode.SettleOnSend; message.TxnId = txnId; this.pendingDeliveries.DoWork(message); } bool IWorkDelegate<AmqpMessage>.Invoke(AmqpMessage message) { DeliveryState deliveryState = message.State; if (deliveryState != null && deliveryState.DescriptorCode == Released.Code) { // message has been canceled (e.g. timed out) return true; } bool success = this.TrySendDelivery(message); if (!success && this.Session.State == AmqpObjectState.Opened && DateTime.UtcNow - this.lastFlowRequestTime >= MinRequestCreditWindow) { // Tell the other side that we have some messages to send this.lastFlowRequestTime = DateTime.UtcNow; ActionItem.Schedule(static s => OnRequestCredit(s), this); } return success; } static void ResendDeliveriesOnOpen(object sender, EventArgs eventArgs) { var thisPtr = (SendingAmqpLink)sender; thisPtr.Opened -= ResendDeliveriesOnOpen; foreach (Delivery delivery in thisPtr.deliveriesToBeResentUponRecovery) { thisPtr.ForceSendDelivery(delivery); } thisPtr.deliveriesToBeResentUponRecovery = null; } readonly struct SendMessageParam { public SendMessageParam(AmqpMessage message, ArraySegment<byte> deliveryTag, ArraySegment<byte> txnId) { this.Message = message; this.DeliveryTag = deliveryTag; this.TxnId = txnId; } public readonly AmqpMessage Message; public readonly ArraySegment<byte> DeliveryTag; public readonly ArraySegment<byte> TxnId; } sealed class SendAsyncResult : TimeoutAsyncResult<string>, IWork<Outcome> { readonly SendingAmqpLink link; readonly AmqpMessage message; readonly ArraySegment<byte> deliveryTag; readonly ArraySegment<byte> txnId; Outcome outcome; public SendAsyncResult( SendingAmqpLink link, AmqpMessage message, ArraySegment<byte> deliveryTag, ArraySegment<byte> txnId, TimeSpan timeout, CancellationToken cancellationToken, AsyncCallback callback, object state) : base(timeout, cancellationToken, callback, state) { this.link = link; this.message = message; this.deliveryTag = deliveryTag; this.txnId = txnId; this.link.inflightSends.StartWork(deliveryTag, this); } public static Outcome End(IAsyncResult result) { return AsyncResult.End<SendAsyncResult>(result).outcome; } public Outcome Outcome { get { return this.outcome; } } public void Start() { this.link.SendMessageInternal(this.message, this.deliveryTag, this.txnId); this.StartTracking(); } public void Done(bool completedSynchronously, Outcome outcome) { this.outcome = outcome; this.CompleteSelf(completedSynchronously); } public override void Cancel(bool isSynchronous) { if (this.CancelInflight()) { this.CompleteSelf(isSynchronous, new TaskCanceledException()); } } public void Cancel(bool completedSynchronously, Exception exception) { Exception completionException = exception; if (exception is OperationCanceledException && this.link.TerminalException != null) { // the link was canceled because of some termination exception. // Since the termination exception might not be caused // by this specific async result, we should keep throwing // OperationCancelledException (which is transient) but // use the TerminalException message to allow user to // debug. completionException = new OperationCanceledException(this.link.TerminalException.Message, this.link.TerminalException); } this.CompleteSelf(completedSynchronously, completionException); } protected override string Target { get { return "message"; } } protected override void CompleteOnTimer() { this.CancelInflight(); base.CompleteOnTimer(); } bool CancelInflight() { if (this.link.inflightSends.TryRemoveWork(this.deliveryTag, out _)) { // try to remove this delivery on the other side, note that the message may // still be sent and accepted by the broker already. This race is by design. if (this.message.Link != null) { this.link.DisposeDelivery(this.message, true, AmqpConstants.ReleasedOutcome); } else { this.message.State = AmqpConstants.ReleasedOutcome; } return true; } return false; } } } }