src/Transport/Mock/MockTransport.cs (355 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Apache.NMS.ActiveMQ.Commands; using Apache.NMS.ActiveMQ.Threads; using Apache.NMS.ActiveMQ.Util.Synchronization; using Task = Apache.NMS.ActiveMQ.Threads.Task; #if NETCF using ThreadInterruptedException = System.Exception; #endif namespace Apache.NMS.ActiveMQ.Transport.Mock { /// <summary> /// Transport used for testing, mimics the behaviour of a normal Transport and allows /// messages to be sent and received /// </summary> public class MockTransport : ITransport { #region Properties private string name; private int timeout = -1; private int asynctimeout = -1; private bool failOnSendMessage = false; private int numMessagesToRespondTo = -1; private int numMessagesRespondedTo = 0; private bool respondToMessages = true; private int numSentMessagesBeforeFail = -1; private int numSentMessages = 0; private bool failOnReceiveMessage = false; private int numReceivedMessagesBeforeFail = 0; private int numReceivedMessages = 0; private bool failOnKeepAliveInfoSends = false; private int numSentKeepAliveInfosBeforeFail = 0; private int numSentKeppAliveInfos = 0; private int nextCommandId = 0; private readonly Uri connectedUri; private CommandHandlerAsync commandHandlerAsync; private CommandHandlerAsync outgoingCommandHandlerAsync; private ExceptionHandler exceptionHandler; private InterruptedHandler interruptedHandler; private ResumedHandler resumedHandler; private bool disposed = false; private bool started = false; private readonly TaskRunner asyncResponseTask; private readonly Queue<Command> receiveQueue = new Queue<Command>(); private readonly IResponseBuilder responseBuilder = new OpenWireResponseBuilder(); #endregion #region Async Response Task private class AsyncResponseTask : Task { private readonly MockTransport parent; public AsyncResponseTask(MockTransport parent) { this.parent = parent; } public bool Iterate() { Command command = null; lock(this.parent.receiveQueue) { if(this.parent.receiveQueue.Count == 0) { return false; } // Grab everything that's currently in the Queue, command = this.parent.receiveQueue.Dequeue(); } if(command.IsMessage) { this.parent.NumReceivedMessages++; if(this.parent.FailOnReceiveMessage && this.parent.NumReceivedMessages > this.parent.NumReceivedMessagesBeforeFail) { Tracer.Debug("MockTransport Async Task: Performing configured receive failure."); this.parent.Exception(this.parent, new IOException("Failed to Receive Message.")); } } // Send all the responses. Tracer.Debug("MockTransport Async Task: Simulate receive of Command: " + command.ToString()); this.parent.CommandAsync(this.parent, command).GetAsyncResult(); return parent.receiveQueue.Count != 0; } } #endregion public MockTransport(Uri uri) { this.connectedUri = uri; Tracer.Debug("Creating Async Response task"); asyncResponseTask = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(new AsyncResponseTask(this), "ActiveMQ MockTransport Worker: " + this.GetHashCode().ToString()); } ~MockTransport() { Dispose(false); } public Task<Response> RequestAsync(Command command) { return this.RequestAsync(command, TimeSpan.FromMilliseconds(System.Threading.Timeout.Infinite)); } public Task<Response> RequestAsync(Command command, TimeSpan timeout) { Tracer.Debug("MockTransport sending Request Command: " + command.ToString()); if(command.IsMessage) { this.numSentMessages++; if(this.failOnSendMessage && this.numSentMessages > this.numSentMessagesBeforeFail) { throw new IOException("Failed to Send Message."); } } // Notify external Client of command that we "sent" if(this.OutgoingCommand != null) { this.OutgoingCommand(this, command); } command.CommandId = Interlocked.Increment(ref this.nextCommandId); command.ResponseRequired = true; return System.Threading.Tasks.Task.FromResult<Response>(this.responseBuilder.BuildResponse(command)); } public void Oneway(Command command) { Tracer.Debug("MockTransport sending oneway Command: " + command.ToString()); if(command.IsMessage) { this.numSentMessages++; if(this.failOnSendMessage && this.numSentMessages > this.numSentMessagesBeforeFail) { Tracer.Debug("MockTransport Oneway send, failing as per configuration."); throw new IOException("Failed to Send Message."); } } if(command.IsKeepAliveInfo) { this.numSentKeppAliveInfos++; if(this.failOnKeepAliveInfoSends && this.numSentKeppAliveInfos > this.numSentKeepAliveInfosBeforeFail) { Tracer.Debug("MockTransport Oneway send, failing as per configuration."); throw new IOException("Failed to Send Message."); } } // Process and send any new Commands back. List<Command> results = new List<Command>(); // Let the Response Builder give us the Commands to send to the Client App. if(command.IsMessage) { if(this.respondToMessages && this.NumMessagesToRespondTo < this.numMessagesRespondedTo) { results = this.responseBuilder.BuildIncomingCommands(command); this.numMessagesRespondedTo++; } } else { results = this.responseBuilder.BuildIncomingCommands(command); } lock(this.receiveQueue) { foreach(Command result in results) { this.receiveQueue.Enqueue(result); } } this.asyncResponseTask.Wakeup(); // Send the Command to the Outgoing Command Snoop Hook. if(this.OutgoingCommand != null) { Tracer.Debug("MockTransport Oneway, Notifying Outgoing linstener."); this.OutgoingCommand(this, command); } } public FutureResponse AsyncRequest(Command command) { FutureResponse response = new FutureResponse(); // Delegate to the Request method, it doesn't block. response.Response = this.RequestAsync(command).ConfigureAwait(false).GetAwaiter().GetResult(); return response; } public void Start() { if(commandHandlerAsync == null) { throw new InvalidOperationException("command cannot be null when Start is called."); } if(exceptionHandler == null) { throw new InvalidOperationException("exception cannot be null when Start is called."); } this.started = true; } public System.Threading.Tasks.Task StartAsync() { Start(); return System.Threading.Tasks.Task.CompletedTask; } public void Stop() { this.started = false; } public System.Threading.Tasks.Task StopAsync() { Stop(); return System.Threading.Tasks.Task.CompletedTask; } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { this.started = false; this.disposed = true; } /// <summary> /// Injects a Command into the Transports inbound message queue, the Commands in the /// inbound Queue are dispatched to the registered CommnadHandler instance for /// processing, this simulates receiving a message from an external source, e.g. /// receiving a new message from the Broker. /// </summary> /// <param name="command"> /// A <see cref="CommandAsync"/> /// </param> public void InjectCommand(Command command) { lock(this.receiveQueue) { this.receiveQueue.Enqueue(command); } this.asyncResponseTask.Wakeup(); } public Object Narrow(Type type) { return this.GetType().Equals(type) ? this : null; } #region Property Accessors public string Name { get { return this.name; } set { this.name = value; } } /// <summary> /// Timeout in milliseconds to wait for sending synchronous messages or commands. /// Set to -1 for infinite timeout. /// </summary> public int Timeout { get { return this.timeout; } set { this.timeout = value; } } /// <summary> /// Timeout in milliseconds to wait for sending asynchronous messages or commands. /// Set to -1 for infinite timeout. /// </summary> public int AsyncTimeout { get { return this.asynctimeout; } set { this.asynctimeout = value; } } public CommandHandlerAsync CommandAsync { get { return commandHandlerAsync; } set { this.commandHandlerAsync = value; } } public CommandHandlerAsync OutgoingCommand { get { return outgoingCommandHandlerAsync; } set { this.outgoingCommandHandlerAsync = value; } } public ExceptionHandler Exception { get { return exceptionHandler; } set { this.exceptionHandler = value; } } public InterruptedHandler Interrupted { get { return interruptedHandler; } set { this.interruptedHandler = value; } } public ResumedHandler Resumed { get { return resumedHandler; } set { this.resumedHandler = value; } } public bool IsDisposed { get { return this.disposed; } } public bool IsStarted { get { return this.started; } } public bool FailOnSendMessage { get { return failOnSendMessage; } set { this.failOnSendMessage = value; } } public int NumSentMessagesBeforeFail { get { return numSentMessagesBeforeFail; } set { numSentMessagesBeforeFail = value; } } public int NumSentMessages { get { return numSentMessages; } set { numSentMessages = value; } } public bool FailOnReceiveMessage { get { return failOnReceiveMessage; } set { failOnReceiveMessage = value; } } public int NumReceivedMessagesBeforeFail { get { return numReceivedMessagesBeforeFail; } set { numReceivedMessagesBeforeFail = value; } } public int NumReceivedMessages { get { return numReceivedMessages; } set { numReceivedMessages = value; } } public bool FailOnKeepAliveInfoSends { get { return failOnKeepAliveInfoSends; } set { failOnKeepAliveInfoSends = value; } } public int NumSentKeepAliveInfosBeforeFail { get { return numSentKeepAliveInfosBeforeFail; } set { numSentKeepAliveInfosBeforeFail = value; } } public int NumSentKeppAliveInfos { get { return numSentKeppAliveInfos; } set { numSentKeppAliveInfos = value; } } public int NumMessagesToRespondTo { get { return numMessagesToRespondTo; } set { numMessagesToRespondTo = value; } } public int NumMessagesRespondedTo { get { return numMessagesRespondedTo; } set { numMessagesRespondedTo = value; } } public bool RespondToMessages { get { return respondToMessages; } set { respondToMessages = value; } } public bool IsFaultTolerant { get { return false; } } public bool IsConnected { get { return true; } } public Uri RemoteAddress { get { return connectedUri; } } public bool IsReconnectSupported { get { return false; } } public bool IsUpdateURIsSupported { get { return false; } } public void UpdateURIs(bool rebalance, Uri[] updatedURIs) { throw new IOException(); } public IWireFormat WireFormat { get { return null; } } #endregion } }