Source/Tx.Network/Udp/BaseUdpReceiver.cs (134 lines of code) (raw):

namespace Tx.Network { using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Net; using System.Net.Sockets; using System.Reactive.Subjects; public abstract class BaseUdpReceiver<T> : IObservable<T>, IDisposable { #region Public Fields public IPEndPoint ListenEndPoint { get; private set; } public ProtocolType ListenProtocol { get; private set; } public uint ConcurrentReceivers { get; private set; } #endregion #region Private Fields private Socket socket; private readonly Subject<T> packetSubject; private ConcurrentQueue<SocketAsyncEventArgs> receivedDataProcessorsPool; private bool subscribed; private readonly List<IDisposable> disposeables = new List<IDisposable>(); #endregion #region Constructors /// <summary> /// Constructs a Receiver of Observable Packets /// </summary> /// <param name="listenEndPoint">IPEndPoint constructed with any valid local Ipv4 IP or IPAddress.Any and UDP port number</param> /// <param name="concurrentReceivers">Number of concurrent packet processors to use. Each one is allocated a buffer of 64Kbytes of memory.</param> /// <remarks>Concurrent receivers allow for scaling of allocated buffers. /// each receiver holds up to 64k bytes and multiple receivers allow for concurrent packet /// reception from the underlying socket object.</remarks> protected BaseUdpReceiver(IPEndPoint listenEndPoint, uint concurrentReceivers) { this.ListenProtocol = ProtocolType.Udp; this.ConcurrentReceivers = concurrentReceivers; this.ListenEndPoint = listenEndPoint; this.packetSubject = new Subject<T>(); } #endregion #region Public Methods /// <summary> /// Subscribes an observer to the observable packet stream. /// </summary> /// <param name="observer">Observer accepting type IpPacket</param> /// <returns>IDisposable object</returns> public IDisposable Subscribe(IObserver<T> observer) { var o = this.packetSubject.Subscribe(observer); if (!this.subscribed) { this.subscribed = true; this.Start(); } return o; } /// <summary> /// Gets the amount of data that has been received from the network and is available to be read. /// </summary> /// <value> /// The amount of data that has been received from the network and is available to be read. /// </value> public int AvailableBytes { get { return this.socket.Available; } } #endregion #region Private Methods private void Start() { this.receivedDataProcessorsPool = new ConcurrentQueue<SocketAsyncEventArgs>(); var eventArgsHandler = new EventHandler<SocketAsyncEventArgs>(this.ReceiveCompletedHandler); // pre-allocate the SocketAsyncEventArgs in a receiver queue to constrain memory usage for buffers for (var i = 0; i < this.ConcurrentReceivers; i++) { var eventArgs = new SocketAsyncEventArgs(); eventArgs.SetBuffer(new byte[ushort.MaxValue], 0, ushort.MaxValue); eventArgs.Completed += eventArgsHandler; this.receivedDataProcessorsPool.Enqueue(eventArgs); this.disposeables.Add(eventArgs); } this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Udp) { ReceiveBufferSize = int.MaxValue }; this.socket.Bind(this.ListenEndPoint); this.GetDataProcessorAndReceive(); } private void ReceiveCompletedHandler(object caller, SocketAsyncEventArgs socketArgs) { if (!this.disposeCalled) { this.GetDataProcessorAndReceive(); //call a new processor //var packet = new IP(socketArgs.Buffer); T packet; var ipPacket = PacketParser.Parse(DateTimeOffset.UtcNow, false, socketArgs.Buffer, 0, socketArgs.Buffer.Length); var packetCheck = this.TryParse(ipPacket, out packet); if (socketArgs.LastOperation == SocketAsyncOperation.Receive && socketArgs.SocketError == SocketError.Success && packetCheck ) { this.packetSubject.OnNext(packet); } socketArgs.SetBuffer(0, ushort.MaxValue); this.receivedDataProcessorsPool.Enqueue(socketArgs); this.GetDataProcessorAndReceive(); //failed to get a processor at the beginning, try now since an enqueue was performed. } } private void GetDataProcessorAndReceive() { SocketAsyncEventArgs deqAsyncEvent; if (this.receivedDataProcessorsPool.TryDequeue(out deqAsyncEvent)) { if (deqAsyncEvent.Offset != 0 || deqAsyncEvent.Count != ushort.MaxValue) { deqAsyncEvent.SetBuffer(0, ushort.MaxValue); } var sockCheck = this.socket.ReceiveAsync(deqAsyncEvent); } } protected abstract bool TryParse(IpPacket packet, out T envelope); #endregion #region IDisposable Support private bool disposeCalled; public void Dispose() { if (!this.disposeCalled) { this.disposeCalled = true; if (this.socket != null) { try { this.socket.Shutdown(SocketShutdown.Both); this.socket.Dispose(); } catch { // ignored } this.socket = null; } if (this.disposeables != null) { foreach (var toDispose in this.disposeables) { toDispose.Dispose(); } } this.packetSubject.OnCompleted(); this.packetSubject.Dispose(); } } #endregion } }