sdk/servicebus/Microsoft.Azure.ServiceBus/src/ServiceBusConnection.cs (227 lines of code) (raw):

// Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. namespace Microsoft.Azure.ServiceBus { using System; using System.Net; using System.Threading.Tasks; using Microsoft.Azure.Amqp; using Microsoft.Azure.Amqp.Framing; using Microsoft.Azure.Amqp.Transaction; using Microsoft.Azure.Amqp.Transport; using Microsoft.Azure.ServiceBus.Amqp; using Microsoft.Azure.ServiceBus.Primitives; /// <summary> /// Connection object to service bus namespace /// </summary> public class ServiceBusConnection { static readonly Version AmqpVersion = new Version(1, 0, 0, 0); readonly object syncLock; bool isClosedOrClosing; /// <summary> /// Creates a new connection to service bus. /// </summary> /// <param name="connectionStringBuilder"><see cref="ServiceBusConnectionStringBuilder"/> having namespace information.</param> /// <remarks>It is the responsibility of the user to close the connection after use through <see cref="CloseAsync"/></remarks> public ServiceBusConnection(ServiceBusConnectionStringBuilder connectionStringBuilder) : this(connectionStringBuilder?.GetNamespaceConnectionString()) { } /// <summary> /// Creates a new connection to service bus. /// </summary> /// <param name="namespaceConnectionString">Namespace connection string</param> /// <remarks>It is the responsibility of the user to close the connection after use through <see cref="CloseAsync"/></remarks> public ServiceBusConnection(string namespaceConnectionString) : this(namespaceConnectionString, RetryPolicy.Default) { } /// <summary> /// Creates a new connection to service bus. /// </summary> /// <param name="namespaceConnectionString">Namespace connection string.</param> /// <param name="retryPolicy">Retry policy for operations. Defaults to <see cref="RetryPolicy.Default"/></param> /// <remarks>It is the responsibility of the user to close the connection after use through <see cref="CloseAsync"/></remarks> public ServiceBusConnection(string namespaceConnectionString, RetryPolicy retryPolicy = null) : this(retryPolicy) { if (string.IsNullOrWhiteSpace(namespaceConnectionString)) { throw Fx.Exception.ArgumentNullOrWhiteSpace(nameof(namespaceConnectionString)); } var serviceBusConnectionStringBuilder = new ServiceBusConnectionStringBuilder(namespaceConnectionString); if (!string.IsNullOrWhiteSpace(serviceBusConnectionStringBuilder.EntityPath)) { throw Fx.Exception.Argument(nameof(namespaceConnectionString), "NamespaceConnectionString should not contain EntityPath."); } this.InitializeConnection(serviceBusConnectionStringBuilder); } /// <summary> /// Creates a new connection to service bus. /// </summary> /// <param name="namespaceConnectionString">Namespace connection string.</param> /// <param name="operationTimeout">Duration after which individual operations will timeout.</param> /// <param name="retryPolicy">Retry policy for operations. Defaults to <see cref="RetryPolicy.Default"/></param> /// <remarks>It is the responsibility of the user to close the connection after use through <see cref="CloseAsync"/></remarks> [Obsolete("This constructor is obsolete. Use ServiceBusConnection(string namespaceConnectionString, RetryPolicy retryPolicy) constructor instead, providing operationTimeout in the connection string.")] public ServiceBusConnection(string namespaceConnectionString, TimeSpan operationTimeout, RetryPolicy retryPolicy = null) : this(retryPolicy) { if (string.IsNullOrWhiteSpace(namespaceConnectionString)) { throw Fx.Exception.ArgumentNullOrWhiteSpace(nameof(namespaceConnectionString)); } var serviceBusConnectionStringBuilder = new ServiceBusConnectionStringBuilder(namespaceConnectionString); if (!string.IsNullOrWhiteSpace(serviceBusConnectionStringBuilder.EntityPath)) { throw Fx.Exception.Argument(nameof(namespaceConnectionString), "NamespaceConnectionString should not contain EntityPath."); } this.InitializeConnection(serviceBusConnectionStringBuilder); // operationTimeout argument explicitly provided by caller should take precedence over OperationTimeout found in the connection string. this.OperationTimeout = operationTimeout; } /// <summary> /// Creates a new connection to service bus. /// </summary> /// <param name="endpoint">Fully qualified domain name for Service Bus. Most likely, {yournamespace}.servicebus.windows.net</param> /// <param name="transportType">Transport type.</param> /// <param name="retryPolicy">Retry policy for operations. Defaults to <see cref="RetryPolicy.Default"/></param> public ServiceBusConnection(string endpoint, TransportType transportType, RetryPolicy retryPolicy = null) : this(retryPolicy) { if (string.IsNullOrWhiteSpace(endpoint)) { throw Fx.Exception.ArgumentNullOrWhiteSpace(nameof(endpoint)); } var serviceBusConnectionStringBuilder = new ServiceBusConnectionStringBuilder() { Endpoint = endpoint, TransportType = transportType }; this.InitializeConnection(serviceBusConnectionStringBuilder); } internal ServiceBusConnection(RetryPolicy retryPolicy = null) { this.RetryPolicy = retryPolicy ?? RetryPolicy.Default; this.syncLock = new object(); } /// <summary> /// Fully qualified domain name for Service Bus. /// </summary> public Uri Endpoint { get; set; } /// <summary> /// OperationTimeout is applied in erroneous situations to notify the caller about the relevant <see cref="ServiceBusException"/> /// </summary> /// <remarks>Defaults to 1 minute.</remarks> public TimeSpan OperationTimeout { get; set; } /// <summary> /// ConnectionIdleTimeout is applied when a connection recevies no traffic for a certain period of time. /// In some situations, underlying transport layer takes really long time to detect a socket error and it may /// cause some operations to time out after a long wait. This timeout helps to detect and close an idle connection /// without waiting for the network layer to identify a socket error. But this setting may result in some increased /// heartbeat traffic between the client and service. /// </summary> /// <remarks>Defaults to null</remarks> public TimeSpan? ConnectionIdleTimeout { get; set; } /// <summary> /// Retry policy for operations performed on the connection. /// </summary> /// <remarks>Defaults to <see cref="RetryPolicy.Default"/></remarks> public RetryPolicy RetryPolicy { get; set; } /// <summary> /// Get the transport type from the connection string. /// <remarks>Available options: Amqp and AmqpWebSockets.</remarks> /// </summary> public TransportType TransportType { get; set; } /// <summary> /// Token provider for authentication. <see cref="TokenProvider"/> /// </summary> public ITokenProvider TokenProvider { get; set; } /// <summary> /// Returns true if the Service Bus Connection is closed or closing. /// </summary> public bool IsClosedOrClosing { get { lock (syncLock) { return isClosedOrClosing; } } internal set { lock (syncLock) { isClosedOrClosing = value; } } } internal FaultTolerantAmqpObject<AmqpConnection> ConnectionManager { get; set; } internal FaultTolerantAmqpObject<Controller> TransactionController { get; set; } /// <summary> /// Throw an OperationCanceledException if the object is Closing. /// </summary> internal virtual void ThrowIfClosed() { if (this.IsClosedOrClosing) { throw new ObjectDisposedException($"{nameof(ServiceBusConnection)} has already been closed. Please create a new instance"); } } /// <summary> /// Closes the connection. /// </summary> public async Task CloseAsync() { var callClose = false; lock (this.syncLock) { if (!this.IsClosedOrClosing) { this.IsClosedOrClosing = true; callClose = true; } } if (callClose) { await this.ConnectionManager.CloseAsync().ConfigureAwait(false); } } void InitializeConnection(ServiceBusConnectionStringBuilder builder) { this.Endpoint = new Uri(builder.Endpoint); if (builder.SasToken != null) { this.TokenProvider = new SharedAccessSignatureTokenProvider(builder.SasToken); } else if (builder.SasKeyName != null || builder.SasKey != null) { this.TokenProvider = new SharedAccessSignatureTokenProvider(builder.SasKeyName, builder.SasKey); } else if (builder.Authentication.Equals(ServiceBusConnectionStringBuilder.AuthenticationType.ManagedIdentity)) { this.TokenProvider = new ManagedIdentityTokenProvider(); } this.OperationTimeout = builder.OperationTimeout; this.TransportType = builder.TransportType; this.ConnectionIdleTimeout = builder.ConnectionIdleTimeout; this.ConnectionManager = new FaultTolerantAmqpObject<AmqpConnection>(this.CreateConnectionAsync, CloseConnection); this.TransactionController = new FaultTolerantAmqpObject<Controller>(this.CreateControllerAsync, CloseController); } static void CloseConnection(AmqpConnection connection) { MessagingEventSource.Log.AmqpConnectionClosed(connection); connection.SafeClose(); } static void CloseController(Controller controller) { controller.Close(); } async Task<AmqpConnection> CreateConnectionAsync(TimeSpan timeout) { var hostName = this.Endpoint.Host; var timeoutHelper = new TimeoutHelper(timeout, true); var amqpSettings = AmqpConnectionHelper.CreateAmqpSettings( amqpVersion: AmqpVersion, useSslStreamSecurity: true, hasTokenProvider: true, useWebSockets: TransportType == TransportType.AmqpWebSockets); var transportSettings = CreateTransportSettings(); var amqpTransportInitiator = new AmqpTransportInitiator(amqpSettings, transportSettings); var transport = await amqpTransportInitiator.ConnectTaskAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false); var containerId = Guid.NewGuid().ToString(); var amqpConnectionSettings = AmqpConnectionHelper.CreateAmqpConnectionSettings(AmqpConstants.DefaultMaxFrameSize, containerId, hostName); if (this.ConnectionIdleTimeout.HasValue && this.ConnectionIdleTimeout.Value > TimeSpan.Zero) { uint timeOutInMillis = checked((uint)this.ConnectionIdleTimeout.Value.TotalMilliseconds); amqpConnectionSettings.IdleTimeOut = timeOutInMillis; } var connection = new AmqpConnection(transport, amqpSettings, amqpConnectionSettings); await connection.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false); // Always create the CBS Link + Session var cbsLink = new AmqpCbsLink(connection); if (connection.Extensions.Find<AmqpCbsLink>() == null) { connection.Extensions.Add(cbsLink); } MessagingEventSource.Log.AmqpConnectionCreated(hostName, connection); return connection; } async Task<Controller> CreateControllerAsync(TimeSpan timeout) { var timeoutHelper = new TimeoutHelper(timeout, true); var connection = await this.ConnectionManager.GetOrCreateAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false); var sessionSettings = new AmqpSessionSettings { Properties = new Fields() }; AmqpSession amqpSession = null; Controller controller; try { amqpSession = connection.CreateSession(sessionSettings); await amqpSession.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false); controller = new Controller(amqpSession, timeoutHelper.RemainingTime()); await controller.OpenAsync(timeoutHelper.RemainingTime()).ConfigureAwait(false); } catch (Exception exception) { if (amqpSession != null) { await amqpSession.CloseAsync(timeout).ConfigureAwait(false); } MessagingEventSource.Log.AmqpCreateControllerException(this.ConnectionManager.ToString(), exception); throw; } return controller; } TransportSettings CreateTransportSettings() { var hostName = this.Endpoint.Host; var networkHost = this.Endpoint.Host; var port = this.Endpoint.Port; if (TransportType == TransportType.AmqpWebSockets) { return AmqpConnectionHelper.CreateWebSocketTransportSettings( networkHost: networkHost, hostName: hostName, port: port, proxy: WebRequest.DefaultWebProxy); } return AmqpConnectionHelper.CreateTcpTransportSettings( networkHost: networkHost, hostName: hostName, port: port, useSslStreamSecurity: true); } } }