src/Proton/Engine/Implementation/ProtonConnection.cs (614 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Threading.Tasks; using Apache.Qpid.Proton.Buffer; using Apache.Qpid.Proton.Logging; using Apache.Qpid.Proton.Engine.Exceptions; using Apache.Qpid.Proton.Types; using Apache.Qpid.Proton.Types.Transport; using Microsoft.Extensions.Caching.Memory; namespace Apache.Qpid.Proton.Engine.Implementation { /// <summary> /// Implements the mechanics of managing a single AMQP connection associated /// with the provided engine instance. /// </summary> public sealed class ProtonConnection : ProtonEndpoint<IConnection>, IConnection, IHeaderHandler<ProtonEngine>, IPerformativeHandler<ProtonEngine> { private static readonly IProtonLogger LOG = ProtonLoggerFactory.GetLogger<ProtonConnection>(); private readonly Open localOpen = new(); private Open remoteOpen; private AmqpHeader remoteHeader; private readonly IDictionary<ushort, ProtonSession> localSessions = new Dictionary<ushort, ProtonSession>(); private readonly IDictionary<ushort, ProtonSession> remoteSessions = new Dictionary<ushort, ProtonSession>(); // These would be sessions that were begun and ended before the remote ever // responded with a matching being and end. The remote is required to complete // these before answering a new begin sequence on the same local channel. private readonly MemoryCache zombieSessions = new(new MemoryCacheOptions()); private ConnectionState localState = ConnectionState.Idle; private ConnectionState remoteState = ConnectionState.Idle; private bool headerSent; private bool localOpenSent; private bool localCloseSent; private Action<AmqpHeader> remoteHeaderHandler; private Action<ISession> remoteSessionOpenEventHandler; private Action<ISender> remoteSenderOpenEventHandler; private Action<IReceiver> remoteReceiverOpenEventHandler; private Action<ITransactionManager> remoteTxnManagerOpenEventHandler; public ProtonConnection(ProtonEngine engine) : base(engine) { // This configures the default for the client which could later be made configurable // by adding an option in EngineConfiguration but for now this is forced set here. localOpen.MaxFrameSize = ProtonConstants.DefaultMaxAmqpFrameSize; } public override IConnection Open() { if (ConnectionState == ConnectionState.Idle) { engine.CheckShutdownOrFailed("Cannot open a connection when Engine is shutdown or failed."); localState = ConnectionState.Active; try { SyncLocalStateWithRemote(); } finally { FireLocalOpen(); } } return this; } public override IConnection Close() { if (ConnectionState == ConnectionState.Active) { localState = ConnectionState.Closed; try { engine.CheckFailed("Connection close called while engine ."); SyncLocalStateWithRemote(); } finally { foreach (ProtonSession session in AllSessions()) { session.HandleConnectionLocallyClosed(this); } FireLocalClose(); } } return this; } public IConnection Negotiate() { return Negotiate((header) => { LOG.Trace("Negotiation completed with remote returning AMQP Header: {}", header); }); } public IConnection Negotiate(in Action<AmqpHeader> remoteAMQPHeaderHandler) { if (remoteAMQPHeaderHandler == null) { throw new ArgumentNullException(nameof(remoteAMQPHeaderHandler), "Provided AMQP Header received handler cannot be null"); } CheckConnectionClosed("Cannot start header negotiation on a closed connection"); if (remoteHeader != null) { remoteAMQPHeaderHandler.Invoke(remoteHeader); } else { remoteHeaderHandler = remoteAMQPHeaderHandler; } SyncLocalStateWithRemote(); return this; } public long Tick(long current) { CheckConnectionClosed("Cannot call tick on an already closed Connection"); return engine.Tick(current); } public IConnection TickAuto(in TaskFactory taskFactory) { CheckConnectionClosed("Cannot call tickAuto on an already closed Connection"); engine.TickAuto(taskFactory); return this; } public ISession Session() { CheckConnectionClosed("Cannot create a Session from a Connection that is already closed"); ushort localChannel = FindFreeLocalChannel(); ProtonSession newSession = new(this, localChannel); localSessions.Add(localChannel, newSession); return newSession; } public ConnectionState ConnectionState => localState; public ConnectionState RemoteConnectionState => remoteState; public override bool IsLocallyOpen => ConnectionState == ConnectionState.Active; public override bool IsLocallyClosed => ConnectionState == ConnectionState.Closed; public override bool IsRemotelyOpen => RemoteConnectionState == ConnectionState.Active; public override bool IsRemotelyClosed => RemoteConnectionState == ConnectionState.Closed; public string ContainerId { get => localOpen.ContainerId; set { CheckNotOpened("Cannot set Container Id on already opened Connection"); localOpen.ContainerId = value; } } public string RemoteContainerId => remoteOpen?.ContainerId; public string Hostname { get => localOpen.Hostname; set { CheckNotOpened("Cannot set Host name on already opened Connection"); localOpen.Hostname = value; } } public string RemoteHostname => remoteOpen?.Hostname; public ushort ChannelMax { get => localOpen.ChannelMax; set { CheckNotOpened("Cannot set Channel Max on already opened Connection"); localOpen.ChannelMax = value; } } public ushort RemoteChannelMax => remoteOpen?.ChannelMax ?? 0; public uint MaxFrameSize { get => localOpen.MaxFrameSize; set { CheckNotOpened("Cannot set Max Frame Size on already opened Connection"); // We are specifically limiting max frame size to 2GB here as our buffers implementations // cannot handle anything larger so we must protect them from larger frames. if (value > int.MaxValue) { throw new ArgumentOutOfRangeException(string.Format( "Given max frame size value {0} larger than this implementations limit of {1}", value, int.MaxValue)); } localOpen.MaxFrameSize = value; } } public uint RemoteMaxFrameSize => remoteOpen?.MaxFrameSize ?? ProtonConstants.MinMaxAmqpFrameSize; public uint IdleTimeout { get => localOpen.IdleTimeout; set { CheckNotOpened("Cannot set Idle Timeout on already opened Connection"); localOpen.IdleTimeout = value; } } public uint RemoteIdleTimeout => remoteOpen?.IdleTimeout ?? 0; public override Symbol[] OfferedCapabilities { get => (Symbol[])(localOpen.OfferedCapabilities?.Clone()); set { CheckNotOpened("Cannot set offered capabilities on already opened Connection"); localOpen.OfferedCapabilities = (Symbol[])(value?.Clone()); } } public override Symbol[] RemoteOfferedCapabilities => (Symbol[])(remoteOpen?.OfferedCapabilities?.Clone()); public override Symbol[] DesiredCapabilities { get => (Symbol[])(localOpen.DesiredCapabilities?.Clone()); set { CheckNotOpened("Cannot set desired capabilities on already opened Connection"); localOpen.DesiredCapabilities = (Symbol[])(value?.Clone()); } } public override Symbol[] RemoteDesiredCapabilities => (Symbol[])(remoteOpen?.DesiredCapabilities?.Clone()); public override IReadOnlyDictionary<Symbol, object> Properties { get { if (localOpen.Properties != null) { return new Dictionary<Symbol, object>(localOpen.Properties); } else { return null; } } set { CheckNotOpened("Cannot set Properties on already opened Connection"); if (value != null && value.Count > 0) { localOpen.Properties = new Dictionary<Symbol, object>(value); } } } public override IReadOnlyDictionary<Symbol, object> RemoteProperties { get { if (remoteOpen != null && remoteOpen.Properties != null) { return new Dictionary<Symbol, object>(remoteOpen.Properties); } else { return null; } } } public ICollection<ISession> Sessions { get { ISet<ISession> result; if (localSessions.Count == 0 && remoteSessions.Count == 0) { result = new HashSet<ISession>(); } else { result = new HashSet<ISession>(localSessions.Values); foreach (ProtonSession session in remoteSessions.Values) { result.Add(session); } } return result; } } public IConnection SessionOpenedHandler(Action<ISession> handler) { this.remoteSessionOpenEventHandler = handler; return this; } public IConnection ReceiverOpenedHandler(Action<IReceiver> handler) { this.remoteReceiverOpenEventHandler = handler; return this; } public IConnection SenderOpenedHandler(Action<ISender> handler) { this.remoteSenderOpenEventHandler = handler; return this; } public IConnection TransactionManagerOpenedHandler(Action<ITransactionManager> handler) { this.remoteTxnManagerOpenEventHandler = handler; return this; } #region Event Handlers for AMQP Performatives public void HandleAMQPHeader(AmqpHeader header, ProtonEngine context) { remoteHeader = header; if (remoteHeaderHandler != null) { remoteHeaderHandler.Invoke(remoteHeader); remoteHeaderHandler = null; } SyncLocalStateWithRemote(); } public void HandleSASLHeader(AmqpHeader header, ProtonEngine context) { context.EngineFailed(new ProtocolViolationException("Received unexpected SASL Header")); } public void HandleOpen(Open open, IProtonBuffer payload, ushort channel, ProtonEngine context) { if (remoteOpen != null) { context.EngineFailed(new ProtocolViolationException("Received second Open for Connection from remote")); return; } remoteState = ConnectionState.Active; remoteOpen = open; FireRemoteOpen(); } public void HandleClose(Close close, IProtonBuffer payload, ushort channel, ProtonEngine context) { remoteState = ConnectionState.Closed; RemoteErrorCondition = close.Error?.Copy(); foreach (ProtonSession session in AllSessions()) { session.HandleConnectionRemotelyClosed(this); } FireRemoteClose(); } public void HandleBegin(Begin begin, IProtonBuffer payload, ushort channel, ProtonEngine context) { ProtonSession session; if (channel > localOpen.ChannelMax) { ErrorCondition = new ErrorCondition(ConnectionError.FRAMING_ERROR, "Channel Max Exceeded for session Begin"); Close(); } else if (remoteSessions.ContainsKey(channel)) { context.EngineFailed(new ProtocolViolationException("Received second begin for Session from remote")); } else { // If there is a remote channel then this is an answer to a local open of a session, otherwise // the remote is requesting a new session and we need to create one and signal that a remote // session was opened. if (begin.HasRemoteChannel()) { ushort localSessionChannel = begin.RemoteChannel; if (!localSessions.TryGetValue(localSessionChannel, out session)) { // If there is a session that was begun and ended before remote responded we // expect that this exchange refers to that session and proceed as though the // remote is going to begin and end it now (as it should). The alternative is // that the remote is doing something not compliant with the specification and // we fail the engine to indicate this. if (zombieSessions.TryGetValue(localSessionChannel, out session)) { if (session != null) { // The session will now get tracked as a remote session and the next // end will take care of normal remote session cleanup. zombieSessions.Remove(localSessionChannel); } else { // The session was reclaimed by GC and we retain the fact that it was // here so that the end that should be following doesn't result in an // engine failure. return; } } else { ErrorCondition = new ErrorCondition(AmqpError.PRECONDITION_FAILED, "No matching session found for remote channel given"); Close(); engine.EngineFailed(new ProtocolViolationException("Received uncorrelated channel on Begin from remote: " + localSessionChannel)); return; } } } else { session = (ProtonSession)Session(); } remoteSessions.Add(channel, session); // Let the session handle the remote Begin now. session.RemoteBegin(begin, channel); // If the session was initiated remotely then we signal the creation to the any registered // remote session event handler if (session.State == SessionState.Idle) { remoteSessionOpenEventHandler?.Invoke(session); } } } public void HandleEnd(End end, IProtonBuffer payload, ushort channel, ProtonEngine context) { if (remoteSessions.TryGetValue(channel, out ProtonSession session)) { remoteSessions.Remove(channel); session.RemoteEnd(end, channel); } else { // Check that we don't have a lingering session that was opened and closed locally for // which the remote is finally getting round to ending but we lost the session instance // due to it being cleaned up by GC, if (zombieSessions.TryGetValue(channel, out _)) { zombieSessions.Remove(channel); } else { engine.EngineFailed(new ProtocolViolationException("Received uncorrelated channel on End from remote: " + channel)); } } } public void HandleAttach(Attach attach, IProtonBuffer payload, ushort channel, ProtonEngine context) { if (remoteSessions.TryGetValue(channel, out ProtonSession session)) { session.RemoteAttach(attach, channel); } else { engine.EngineFailed(new ProtocolViolationException("Received uncorrelated channel on Attach from remote: " + channel)); } } public void HandleDetach(Detach detach, IProtonBuffer payload, ushort channel, ProtonEngine context) { if (remoteSessions.TryGetValue(channel, out ProtonSession session)) { session.RemoteDetach(detach, channel); } else { engine.EngineFailed(new ProtocolViolationException("Received uncorrelated channel on Detach from remote: " + channel)); } } public void HandleFlow(Flow flow, IProtonBuffer payload, ushort channel, ProtonEngine context) { if (remoteSessions.TryGetValue(channel, out ProtonSession session)) { session.RemoteFlow(flow, channel); } else { engine.EngineFailed(new ProtocolViolationException("Received uncorrelated channel on Flow from remote: " + channel)); } } public void HandleTransfer(Transfer transfer, IProtonBuffer payload, ushort channel, ProtonEngine context) { if (remoteSessions.TryGetValue(channel, out ProtonSession session)) { session.RemoteTransfer(transfer, payload, channel); } else { engine.EngineFailed(new ProtocolViolationException("Received uncorrelated channel on Transfer from remote: " + channel)); } } public void HandleDisposition(Disposition disposition, IProtonBuffer payload, ushort channel, ProtonEngine context) { if (remoteSessions.TryGetValue(channel, out ProtonSession session)) { session.RemoteDisposition(disposition, channel); } else { engine.EngineFailed(new ProtocolViolationException("Received uncorrelated channel on Disposition from remote: " + channel)); } } #endregion #region Internal and Private Connection APIs internal bool HasReceiverOpenEventHandler => remoteReceiverOpenEventHandler != null; internal bool HasSenderOpenEventHandler => remoteSenderOpenEventHandler != null; internal bool HasTransactionManagerOpenHandler => remoteTxnManagerOpenEventHandler != null; internal void FireRemoteReceiverOpened(IReceiver receiver) { remoteReceiverOpenEventHandler?.Invoke(receiver); } internal void FireRemoteSenderOpened(ISender sender) { remoteSenderOpenEventHandler?.Invoke(sender); } internal void FireRemoteTransactionManagerOpened(ITransactionManager manager) { remoteTxnManagerOpenEventHandler?.Invoke(manager); } internal void HandleEngineStarted(ProtonEngine protonEngine) { SyncLocalStateWithRemote(); } internal void HandleEngineShutdown(ProtonEngine protonEngine) { try { FireEngineShutdown(); } catch (Exception) { } foreach (ProtonSession session in AllSessions()) { session.HandleEngineShutdown(protonEngine); } } internal void HandleEngineFailed(ProtonEngine protonEngine, Exception cause) { if (localOpenSent && !localCloseSent) { localCloseSent = true; try { if (ErrorCondition == null) { ErrorCondition = ErrorConditionFromFailureCause(cause); } Close forcedClose = new() { Error = ErrorCondition }; engine.FireWrite(forcedClose, 0); } catch (Exception) { } } } internal override IConnection Self() { return this; } private void CheckNotOpened(string errorMessage) { if (localState > ConnectionState.Idle) { throw new InvalidOperationException(errorMessage); } } private void CheckConnectionClosed(string errorMessage) { if (IsLocallyClosed || IsRemotelyClosed) { throw new InvalidOperationException(errorMessage); } } private ISet<ProtonSession> AllSessions() { ISet<ProtonSession> result; if (localSessions.Count == 0 && remoteSessions.Count == 0) { result = new HashSet<ProtonSession>(); } else { result = new HashSet<ProtonSession>(localSessions.Values); foreach (ProtonSession session in remoteSessions.Values) { result.Add(session); } } return result; } private void SyncLocalStateWithRemote() { if (engine.IsWritable) { // When the engine state changes or we have read an incoming AMQP header etc we need to check // if we have pending work to send and do so if (headerSent) { ConnectionState state = ConnectionState; // Once an incoming header arrives we can emit our open if locally opened and also send close if // that is what our state is already. if (state != ConnectionState.Idle && remoteHeader != null) { bool resourceSyncNeeded = false; if (!localOpenSent && !engine.IsShutdown) { engine.FireWrite(localOpen, 0); engine.RecomputeEffectiveFrameSizeLimits(); localOpenSent = true; resourceSyncNeeded = true; } if (IsLocallyClosed && !localCloseSent && !engine.IsShutdown) { Close localClose = new() { Error = ErrorCondition }; engine.FireWrite(localClose, 0); localCloseSent = true; resourceSyncNeeded = false; // Session resources can't write anything now } if (resourceSyncNeeded) { foreach (ProtonSession session in AllSessions()) { session.TrySyncLocalStateWithRemote(); } } } } else if (remoteHeader != null || ConnectionState == ConnectionState.Active || remoteHeaderHandler != null) { headerSent = true; engine.FireWrite(HeaderEnvelope.AMQP_HEADER_ENVELOPE); } } } private static ErrorCondition ErrorConditionFromFailureCause(Exception cause) { Symbol condition; string description = cause.Message; if (cause is ProtocolViolationException error) { condition = error.ErrorCondition; } else { condition = AmqpError.INTERNAL_ERROR; } return new ErrorCondition(condition, description); } private ushort FindFreeLocalChannel() { for (ushort i = 0; i <= localOpen.ChannelMax; ++i) { if (!localSessions.ContainsKey(i) && !zombieSessions.TryGetValue(i, out _)) { return i; } } // We didn't find one that isn't free and also not awaiting remote begin / end // so just use an overlap as it should complete in order unless the remote has // completely ignored the specification and or gone of the rails. for (ushort i = 0; i <= localOpen.ChannelMax; ++i) { if (!localSessions.ContainsKey(i)) { return i; } } throw new InvalidOperationException("no local channel available for allocation"); } internal void FreeLocalChannel(ushort localChannel) { ProtonSession session = localSessions[localChannel]; localSessions.Remove(localChannel); if (session.RemoteState == SessionState.Idle) { // The remote hasn't answered our begin yet so we need to hold onto this information // and process the eventual begin that must be provided per specification. zombieSessions.Set(localChannel, session); } } internal bool WasHeaderSent => this.headerSent; internal bool WasLocalOpenSent => this.localOpenSent; internal bool WasLocalCloseSent => this.localCloseSent; #endregion } }