src/Net/ResourceManager.cs (177 lines of code) (raw):

// ------------------------------------------------------------------------------------ // Copyright (c) Microsoft Corporation // All rights reserved. // // Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this // file except in compliance with the License. You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // // THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, // EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR // CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR // NON-INFRINGEMENT. // // See the Apache Version 2.0 License for specific language governing permissions and // limitations under the License. // ------------------------------------------------------------------------------------ namespace Amqp.Transactions { #if NETFX || NETFX40 || NETSTANDARD2_0 using System; using System.Collections.Generic; using System.Threading.Tasks; using System.Transactions; using Amqp; using Amqp.Framing; sealed class ResourceManager { static readonly ResourceManager instance = new ResourceManager(); readonly Dictionary<Connection, Controller> controllers; readonly Dictionary<string, Enlistment> enlistments; ResourceManager() { this.controllers = new Dictionary<Connection, Controller>(); this.enlistments = new Dictionary<string, Enlistment>(StringComparer.OrdinalIgnoreCase); } object SyncRoot { get { return this.enlistments; } } public static async Task<TransactionalState> GetTransactionalStateAsync(Link link) { Transaction txn = Transaction.Current; if (txn != null) { byte[] txnId = await instance.EnlistAsync(link, txn).ConfigureAwait(false); return new TransactionalState() { TxnId = txnId }; } return null; } async Task<byte[]> EnlistAsync(Link link, Transaction txn) { string id = txn.TransactionInformation.LocalIdentifier; Enlistment enlistment; lock (this.SyncRoot) { if (!this.enlistments.TryGetValue(id, out enlistment)) { enlistment = new Enlistment(this, txn); this.enlistments.Add(id, enlistment); txn.TransactionCompleted += this.OnTransactionCompleted; if (!txn.EnlistPromotableSinglePhase(enlistment)) { this.enlistments.Remove(id); txn.TransactionCompleted -= this.OnTransactionCompleted; throw new InvalidOperationException("DTC not supported"); } } } return await enlistment.EnlistAsync(link).ConfigureAwait(false); } void OnTransactionCompleted(object sender, TransactionEventArgs e) { lock (this.SyncRoot) { this.enlistments.Remove(e.Transaction.TransactionInformation.LocalIdentifier); } } Controller GetOrCreateController(Link link) { Controller controller; lock (this.SyncRoot) { if (!this.controllers.TryGetValue(link.Session.Connection, out controller)) { Session session = new Session(link.Session.Connection); controller = new Controller(session); controller.Closed += this.OnControllerClosed; this.controllers.Add(link.Session.Connection, controller); } } return controller; } void OnControllerClosed(IAmqpObject obj, Error error) { var controller = (Controller)obj; bool removed; lock (this.SyncRoot) { removed = this.controllers.Remove(controller.Session.Connection); } if (removed) { controller.Session.CloseInternal(0); } } class Enlistment : IPromotableSinglePhaseNotification { static readonly TimeSpan rollbackTimeout = TimeSpan.FromMinutes(1); readonly ResourceManager owner; readonly Transaction transaction; readonly string transactionId; readonly object syncRoot; Controller controller; Task<byte[]> declareTask; byte[] txnid; public Enlistment(ResourceManager owner, Transaction transaction) { this.owner = owner; this.transaction = transaction; this.transactionId = this.transaction.TransactionInformation.LocalIdentifier; this.syncRoot = new object(); } public async Task<byte[]> EnlistAsync(Link link) { if (this.txnid != null) { return this.txnid; } lock (this.syncRoot) { if (this.declareTask == null) { this.controller = this.owner.GetOrCreateController(link); this.declareTask = this.controller.DeclareAsync(); } } return this.txnid = await this.declareTask.ConfigureAwait(false); } void IPromotableSinglePhaseNotification.Initialize() { } void IPromotableSinglePhaseNotification.Rollback(SinglePhaseEnlistment singlePhaseEnlistment) { lock (this.syncRoot) { if (this.txnid != null) { this.controller.DischargeAsync(this.txnid, true).ContinueWith( (t, o) => { var spe = (SinglePhaseEnlistment)o; if (t.IsFaulted) { spe.Aborted(t.Exception.InnerException); } else { spe.Done(); } }, singlePhaseEnlistment); } } } void IPromotableSinglePhaseNotification.SinglePhaseCommit(SinglePhaseEnlistment singlePhaseEnlistment) { lock (this.syncRoot) { if (this.txnid != null) { this.controller.DischargeAsync(this.txnid, false).ContinueWith( (t, o) => { var spe = (SinglePhaseEnlistment)o; if (t.IsFaulted) { spe.Aborted(t.Exception.InnerException); } else { spe.Done(); } }, singlePhaseEnlistment); } } } byte[] ITransactionPromoter.Promote() { throw new TransactionPromotionException("DTC not supported"); } } } #endif }