rd-net/RdFramework/Impl/SocketWire.cs (589 lines of code) (raw):

using System; using System.Net; using System.Net.Sockets; using System.Threading; using System.Timers; using JetBrains.Annotations; using JetBrains.Collections.Viewable; using JetBrains.Diagnostics; using JetBrains.Lifetimes; using JetBrains.Serialization; using JetBrains.Threading; using Timer = System.Timers.Timer; namespace JetBrains.Rd.Impl { public static class SocketWire { private static readonly ILog ourStaticLog = Log.GetLog<Base>(); public abstract class Base : WireBase { /// <summary> /// Timeout for <see cref="System.Net.Sockets.Socket.Connect(System.Net.EndPoint)"/> and for <see cref="System.Net.Sockets.Socket.Receive(byte[],int,System.Net.Sockets.SocketFlags)"/> from socket (to guarantee read_thread termination if <see cref="System.Net.Sockets.Socket.Close()"/> doesn't /// lead to exception thrown by <see cref="System.Net.Sockets.Socket.Receive(byte[],int,System.Net.Sockets.SocketFlags)"/> /// </summary> public static int TimeoutMs = 500; private const int ACK_MSG_LEN = -1; private const int PING_LEN = -2; /// <summary> /// For logging /// </summary> public readonly string Id; protected readonly ILog Log; /// <summary> /// Lifetime of this wire. If counterpart disconnects, lifetime is not terminate automatically. /// </summary> private readonly Lifetime myLifetime; //All operations must be bound to socket (connect or accept) thread. protected readonly IViewableProperty<Socket> SocketProvider = new ViewableProperty<Socket> (); public readonly IViewableProperty<bool> Connected = new ViewableProperty<bool> { Value = false }; public readonly IViewableProperty<bool> HeartbeatAlive = new ViewableProperty<bool> { Value = false }; protected readonly ByteBufferAsyncProcessor SendBuffer; protected readonly object Lock = new object(); public Socket Socket { get; protected set; } [PublicAPI] public long ReadBytesCount; [PublicAPI] public long WrittenBytesCount; private readonly Actor<long> myAcktor; const string DisconnectedPauseReason = "Disconnected"; protected Base(string id, Lifetime lifetime, IScheduler scheduler) { Id = id; Log = Diagnostics.Log.GetLog(GetType()); myLifetime = lifetime; myAcktor = new Actor<long>(id+"-ACK", lifetime, SendAck); Socket = null!; // inheritor must initialize socket before use SendBuffer = new ByteBufferAsyncProcessor(id+"-Sender", Send0); SendBuffer.Pause(DisconnectedPauseReason); SendBuffer.Start(); Connected.Advise(lifetime, value => HeartbeatAlive.Value = value); //when connected SocketProvider.Advise(lifetime, socket => { // //todo hack for multiconnection, bring it to API // if (SupportsReconnect) SendBuffer.Clear(); var timer = StartHeartbeat(); SendBuffer.ReprocessUnacknowledged(); SendBuffer.Resume(DisconnectedPauseReason); scheduler.Queue(() => { Connected.Value = true; }); try { //use current thread for receiving procedure ReceiverProc(socket); } finally { scheduler.Queue(() => {Connected.Value = false;}); SendBuffer.Pause(DisconnectedPauseReason); timer.Dispose(); CloseSocket(socket); } }); } private static bool ConnectionEstablished(int timeStamp, int notionTimestamp) => timeStamp - notionTimestamp <= MaximumHeartbeatDelay; private Timer StartHeartbeat() { var timer = new Timer(HeartBeatInterval.TotalMilliseconds) { AutoReset = false }; void OnTimedEvent(object sender, ElapsedEventArgs e) { Ping(); timer.Start(); } timer.Elapsed += OnTimedEvent; timer.Start(); return timer; } public static void CloseSocket(Socket? socket) { if (socket == null) return; ourStaticLog.CatchAndDrop(() => socket.Shutdown(SocketShutdown.Both)); //on netcore you can't solely execute Close() - it will hang forever //sometimes on netcoreapp2.1 it could hang forever during <c>Accept()</c> on other thread: https://github.com/dotnet/corefx/issues/26034 //we use zero timeout here to avoid blocking mode with (possible infinite) SpinWait // According to reference source, non-zero timeouts (infinite -1 or positive numbers) lead to the problematic code with hanging spin wait. // The linked corefx issue gives mixed feedback on when it's fixed (netcore 3/net 5), but we have first-hand evidence for issues on netcore 3. // Additionally, the worst thing that Close(0) does is sending a connection reset, which seems to be fine for our purposes. ourStaticLog.CatchAndDrop(() => socket.Close(0)); } private BufferWindow myMsgLengthBuffer; private BufferWindow myPkg; private BufferWindow myPkgBuffer; private BufferWindow myPkgHeaderBuffer; private BufferWindow mySocketBuffer; private void ReceiverProc(Socket socket) { myPkg = new BufferWindow(16384); myPkgBuffer = new BufferWindow(16384); mySocketBuffer = new BufferWindow(16384); myMsgLengthBuffer = new BufferWindow(4); myPkgHeaderBuffer = new BufferWindow(12); while (myLifetime.IsAlive) { if (!socket.Connected) { Log.Verbose("Stop receive messages because socket disconnected"); break; } try { if (!ReadMsg()) { Log.Verbose("{0} Connection was gracefully shutdown", Id); break; } } catch (Exception e) { if (e is SocketException socketEx) { var errcode = socketEx.SocketErrorCode; if (errcode == SocketError.TimedOut || errcode == SocketError.WouldBlock) continue; //expected } if (e is SocketException || e is ObjectDisposedException) { Log.Verbose("Exception in SocketWire.Receive: {0} {1} {2}", e.GetType().Name, Id, e.Message); //That's why we don't use Timeout any more. Exception happens only on windows but blocks socket completely. if (e.Message.ToLower().Contains("Overlapped I/O Operation is in progress".ToLower())) { Log.Error( "ERROR! Socket {0} {1} is in invalid state. Probably no more messages will be received. Exception message: '{2}'. " + "Sometimes it happens because of Timeout property on socket. Your os: {3}.", e.GetType().Name, Id, e.Message, Environment.OSVersion.VersionString); } } else { Log.Error(e); } break; } } LogTraffic(); } private bool ReadMsg() { long maxSeqnAtStart = myMaxReceivedSeqn; myMsgLengthBuffer.Lo = myMsgLengthBuffer.Hi = 0; if (!myMsgLengthBuffer.Read(ref myPkgBuffer, ReceiveFromPkgBuffer)) return false; Int32 len = UnsafeReader.ReadInt32FromBytes(myMsgLengthBuffer.Data); var msgBuffer = new BufferWindow(len); if (!msgBuffer.Read(ref myPkgBuffer, ReceiveFromPkgBuffer)) { Log.Warn("{0}: Can't read message with len={1} from the wire because connection was shut down", Id, len); return false; } if (myMaxReceivedSeqn > maxSeqnAtStart) myAcktor.SendAsync(myMaxReceivedSeqn); Receive(msgBuffer.Data); ReadBytesCount += len + sizeof(Int32 /*len*/); return true; } private int ReceiveFromPkgBuffer(byte[] buffer, int offset, int size) { //size > 0 if (myPkg.Available > 0) { var sizeToCopy = Math.Min(size, myPkg.Available); myPkg.MoveTo(buffer, offset, sizeToCopy); return sizeToCopy; } else { while (true) { myPkgHeaderBuffer.Clear(); if (!myPkgHeaderBuffer.Read(ref mySocketBuffer, ReceiveFromSocket)) return 0; Int32 len = UnsafeReader.ReadInt32FromBytes(myPkgHeaderBuffer.Data); if (len == PING_LEN) { Int32 receivedTimestamp = UnsafeReader.ReadInt32FromBytes(myPkgHeaderBuffer.Data, sizeof(Int32)); Int32 receivedCounterpartTimestamp = UnsafeReader.ReadInt32FromBytes(myPkgHeaderBuffer.Data, sizeof(Int32) + sizeof(Int32)); myCounterpartTimestamp = receivedTimestamp; myCounterpartNotionTimestamp = receivedCounterpartTimestamp; if (ConnectionEstablished(myCurrentTimeStamp, myCounterpartNotionTimestamp)) { if (!HeartbeatAlive.Value) // only on change { Log.WhenTrace()?.Log($"Connection is alive after receiving PING {Id}: " + $"receivedTimestamp: {receivedTimestamp}, " + $"receivedCounterpartTimestamp: {receivedCounterpartTimestamp}, " + $"currentTimeStamp: {myCurrentTimeStamp}, " + $"counterpartTimestamp: {myCounterpartTimestamp}, " + $"counterpartNotionTimestamp: {myCounterpartNotionTimestamp}"); } HeartbeatAlive.Value = true; } continue; } Int64 seqN = UnsafeReader.ReadInt64FromBytes(myPkgHeaderBuffer.Data, sizeof(Int32)); if (len == ACK_MSG_LEN) { SendBuffer.Acknowledge(seqN); } else { myPkg.Clear(); if (!myPkg.Read(ref mySocketBuffer, ReceiveFromSocket, len)) return 0; if (seqN > myMaxReceivedSeqn || seqN == 1 /*TODO new client, possible duplicate problem if ack for seqN=1 from previous client's connection hasn't passed*/) { myMaxReceivedSeqn = seqN; //will be acknowledged when we read whole message Assertion.Assert(myPkg.Available > 0); var sizeToCopy = Math.Min(size, myPkg.Available); myPkg.MoveTo(buffer, offset, sizeToCopy); return sizeToCopy; } else myAcktor.SendAsync(seqN); } } } } private void SendAck(long seqN) { try { using (var cookie = UnsafeWriter.NewThreadLocalWriter()) { cookie.Writer.WriteInt32(ACK_MSG_LEN); cookie.Writer.WriteInt64(seqN); cookie.CopyTo(myAckPkgHeader); } lock (mySocketSendLock) Socket.Send(myAckPkgHeader); } catch (ObjectDisposedException) { Log.Verbose($"{Id}: Socket was disposed during ACK, seqn = {seqN}"); } catch (SocketException e) { // looks like this does not deserve a warn, as the only thing that can happen is a fatal socket failure anyway, and that will likely be reported properly from other threads Log.Verbose(e, $"{Id}: ${e.GetType()} raised during ACK, seqn = {seqN}"); } catch (Exception e) { Log.Warn(e, $"{Id}: {e.GetType()} raised during ACK, seqn = {seqN}"); } } private void Ping() { if (BackwardsCompatibleWireFormat) return; try { if (!ConnectionEstablished(myCurrentTimeStamp, myCounterpartNotionTimestamp)) { if (HeartbeatAlive.Value) // log only on change { Log.WhenTrace()?.Log($"Disconnect detected while sending PING {Id}: " + $"currentTimeStamp: {myCurrentTimeStamp}, " + $"counterpartTimestamp: {myCounterpartTimestamp}, " + $"counterpartNotionTimestamp: {myCounterpartNotionTimestamp}"); } HeartbeatAlive.Value = false; } using (var cookie = UnsafeWriter.NewThreadLocalWriter()) { cookie.Writer.WriteInt32(PING_LEN); cookie.Writer.WriteInt32(myCurrentTimeStamp); cookie.Writer.WriteInt32(myCounterpartTimestamp); cookie.CopyTo(myPingPkgHeader); } lock (mySocketSendLock) Socket.Send(myPingPkgHeader); ++myCurrentTimeStamp; } catch (ObjectDisposedException) { Log.Verbose($"{Id}: Socket was disposed during PING"); } catch (Exception e) { Log.Verbose(e, $"{Id}: {e.GetType()} raised during PING"); } } private readonly object mySocketSendLock = new object(); private int ReceiveFromSocket(byte[] buffer, int offset, int size) { return Socket.Receive(buffer, offset, size, 0); } private long mySentSeqn; private long myMaxReceivedSeqn; private const int PkgHeaderLen = sizeof(int) /*pkgFullLen */ + sizeof(long) /*seqN*/; private readonly byte[] mySendPkgHeader = new byte[ PkgHeaderLen]; private readonly byte[] myAckPkgHeader = new byte[ PkgHeaderLen]; //different threads /// <summary> /// Ping's interval and not actually detection's timeout. /// Its value must be the same on both sides of connection. /// </summary> public TimeSpan HeartBeatInterval { get; set; } = TimeSpan.FromMilliseconds(500); /// <summary> /// Timestamp of this wire which increases at intervals of <see cref="HeartBeatInterval"/> /// </summary> private int myCurrentTimeStamp; /// <summary> /// Actual notion about counterpart's <see cref="myCurrentTimeStamp"/> /// </summary> private int myCounterpartTimestamp; /// <summary> /// The latest received counterpart's notion of this wire's <see cref="myCurrentTimeStamp"/> /// </summary> private int myCounterpartNotionTimestamp; internal const int MaximumHeartbeatDelay = 3; private readonly byte[] myPingPkgHeader = new byte[ PkgHeaderLen]; private void Send0(byte[] data, int offset, int len, ref long seqN) { try { if (seqN == 0) seqN = ++mySentSeqn; using (var cookie = UnsafeWriter.NewThreadLocalWriter()) { cookie.Writer.WriteInt32(len); cookie.Writer.WriteInt64(seqN); cookie.CopyTo(mySendPkgHeader); } lock (mySocketSendLock) { Socket.Send(mySendPkgHeader, 0, PkgHeaderLen, SocketFlags.None); Socket.Send(data, offset, len, SocketFlags.None); } WrittenBytesCount += len; } catch (Exception e) { if (e is SocketException || e is ObjectDisposedException) { SendBuffer.Pause(DisconnectedPauseReason); LogTraffic(); } else { Log.Error(e); } } } protected override void SendPkg(UnsafeWriter.Cookie cookie) { SendBuffer.Put(cookie); } //It's a kind of magic... protected static void SetSocketOptions(Socket s) { s.NoDelay = true; // if (!TimeoutForbidden()) // s.ReceiveTimeout = TimeoutMs; //sometimes shutdown and close doesn't lead Receive to throw exception //following optimization is under Windows only // if (!PlatformUtil.IsRunningUnderWindows) return; // // const int sioLoopbackFastPath = -1744830448; // var optionInValue = BitConverter.GetBytes(1); // // s.IOControl( // sioLoopbackFastPath, // optionInValue, // null); } private void LogTraffic() { Log.Verbose("{0}: Total traffic: sent {1}, received {2}", Id, WrittenBytesCount, ReadBytesCount); } //can't take socket from mySocketProvider: it could be not set yet protected void AddTerminationActions(Thread receiverThread) { // ReSharper disable once ImpureMethodCallOnReadonlyValueField myLifetime.OnTermination(() => { Log.Verbose("{0}: start termination of lifetime", Id); var sendBufferStopped = SendBuffer.Stop(5_000); Log.Verbose("{0}: send buffer stopped, success: {1}", Id, sendBufferStopped); lock (Lock) { Log.Verbose("{0}: closing socket because of lifetime", Id); CloseSocket(Socket); Monitor.PulseAll(Lock); } Log.Verbose("{0}: waiting for receiver thread", Id); if (!receiverThread.Join(TimeoutMs + 100)) Log.Verbose("{0}: unable to join receiver thread", Id); Log.Verbose("{0}: termination finished", Id); } ); } public int Port { get; protected set; } protected virtual bool AcceptHandshake(Socket socket) { return true; } } public class Client : Base { public Client(Lifetime lifetime, IScheduler scheduler, int port, string? optId = null) : this(lifetime, scheduler, new IPEndPoint(IPAddress.Loopback, port), optId) {} public Client(Lifetime lifetime, IScheduler scheduler, IPEndPoint endPoint, string? optId = null) : base("ClientSocket-"+(optId ?? "<noname>"), lifetime, scheduler) { var thread = new Thread(() => { try { Log.Verbose("{0} : started", Id); var lastReportedErrorHash = 0; while (lifetime.IsAlive) { try { var s = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); Socket = s; SetSocketOptions(s); Log.Verbose("{0}: connecting to {1}.", Id, endPoint); s.Connect(endPoint); lock (Lock) { if (!lifetime.IsAlive) { Log.Verbose("{0} : connected, but lifetime is already canceled, closing socket", Id); CloseSocket(s); //to guarantee socket termination return; } else { Log.Verbose("{0} : connected", Id); } } SocketProvider.Value = Socket; } catch (SocketException ex) { var errorHashCode = (ex.Message?.GetHashCode() ?? 0) ^ (ex.StackTrace?.GetHashCode() ?? 0); if (lastReportedErrorHash != errorHashCode) { lastReportedErrorHash = errorHashCode; if (Log.IsVersboseEnabled()) Log.Verbose(ex, $"{Id}: connection error for endpoint \"{endPoint}\"."); } else { Log.Verbose("{0}: connection error for endpoint \"{1}\" ({2}).", Id, endPoint, ex.Message); } lock (Lock) { if (!lifetime.IsAlive) break; Monitor.Wait(Lock, TimeoutMs); if (!lifetime.IsAlive) break; } } } } catch (SocketException e) { Log.Verbose("{0}: SocketException with message {1}", Id, e.Message); } catch (ObjectDisposedException e) { Log.Verbose("{0}: ObjectDisposedException with message {1}", Id, e.Message); } catch (Exception e) { Log.Error(e, Id); } finally { Log.Verbose("{0}: terminated.", Id); } }) {Name = Id+"-Receiver", IsBackground = true}; thread.Start(); AddTerminationActions(thread); } } public class Server : Base { public Server(Lifetime lifetime, IScheduler scheduler, IPEndPoint? endPoint = null, string? optId = null) : this(lifetime, scheduler, optId) { var serverSocket = CreateServerSocket(endPoint); StartServerSocket(lifetime, serverSocket); lifetime.OnTermination(() => { ourStaticLog.Verbose("closing server socket"); CloseSocket(serverSocket); } ); } /// <summary> /// Creates a server wire with an externally-provided socket. By using this constructor, you are not transferring /// ownership of the provided socket to created wire. It is consumer's responsibility to manager socket's lifetime. /// </summary> public Server(Lifetime lifetime, IScheduler scheduler, Socket serverSocket, string? optId = null) : this(lifetime, scheduler, optId) { StartServerSocket(lifetime, serverSocket); } private Server(Lifetime lifetime, IScheduler scheduler, string? optId = null) : base("ServerSocket-"+(optId ?? "<noname>"), lifetime, scheduler) {} public static Socket CreateServerSocket(IPEndPoint? endPoint) { Protocol.InitLogger.Verbose("Creating server socket on endpoint: {0}", endPoint); var serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); SetSocketOptions(serverSocket); endPoint = endPoint ?? new IPEndPoint(IPAddress.Loopback, 0); serverSocket.Bind(endPoint); serverSocket.Listen(1); Protocol.InitLogger.Verbose("Server socket created, listening started on endpoint: {0}", endPoint); return serverSocket; } private void StartServerSocket(Lifetime lifetime, Socket serverSocket) { if (serverSocket == null) throw new ArgumentNullException(nameof(serverSocket)); Port = ((IPEndPoint) serverSocket.LocalEndPoint).Port; Log.Verbose("{0} : started, port: {1}", Id, Port); var thread = new Thread(() => { Log.Catch(() => { while (lifetime.IsAlive) { try { Log.Verbose("{0} : accepting, port: {1}", Id, Port); var s = serverSocket.Accept(); lock (Lock) { if (!lifetime.IsAlive) { Log.Verbose("{0} : connected, but lifetime is already canceled, closing socket", Id); CloseSocket(s); return; } else { Log.Verbose("{0} : accepted", Id); if (!AcceptHandshake(s)) continue; Socket = s; Log.Verbose("{0} : connected", Id); } } SocketProvider.Value = s; } catch (SocketException e) { var errcode = e.SocketErrorCode; if (errcode == SocketError.TimedOut || errcode == SocketError.WouldBlock) continue; //expected, Linux Log.Verbose("{0}: SocketException with message {1}", Id, e.Message); } catch (ObjectDisposedException e) { Log.Verbose("{0}: ObjectDisposedException with message {1}", Id, e.Message); } catch (Exception e) { Log.Error(e, Id); } } }); Log.Verbose("{0}: terminated.", Id); }) {Name = Id + "-Receiver", IsBackground = true}; thread.Start(); AddTerminationActions(thread); } } public struct WireParameters { public readonly IScheduler Scheduler; public readonly string? Id; public WireParameters(IScheduler scheduler, string? id) { Scheduler = scheduler ?? throw new ArgumentNullException(nameof(scheduler)); Id = id; } public void Deconstruct(out IScheduler scheduler, out string? id) { scheduler = Scheduler; id = Id; } } public class ServerFactory { [PublicAPI] public readonly int LocalPort; [PublicAPI] public readonly IViewableSet<Server> Connected = new ViewableSet<Server>(); public ServerFactory(Lifetime lifetime, IScheduler scheduler, IPEndPoint? endpoint = null) : this(lifetime, () => new WireParameters(scheduler, null), endpoint) {} public ServerFactory( Lifetime lifetime, Func<WireParameters> wireParametersFactory, IPEndPoint? endpoint = null ) { var serverSocket = Server.CreateServerSocket(endpoint); var serverSocketLifetimeDef = new LifetimeDefinition(lifetime); serverSocketLifetimeDef.Lifetime.OnTermination(() => { ourStaticLog.Verbose("closing server socket"); Base.CloseSocket(serverSocket); }); LocalPort = ((IPEndPoint) serverSocket.LocalEndPoint).Port; void Rec() { lifetime.TryExecute(() => { var (scheduler, id) = wireParametersFactory(); var s = new Server(lifetime, scheduler, serverSocket, id); // Each server will spawn a thread that will be waiting in serverSocket.Accept method. When lifetime // termination is invoked, these threads synchronously join the termination thread. Since these Thread.Join // calls are located deeper in the Lifetime termination stack we have to place this socket termination call // after each server creation. lifetime.OnTermination(() => serverSocketLifetimeDef.Terminate()); s.Connected.WhenTrue(lifetime, lt => { Connected.AddLifetimed(lt, s); Rec(); }); }); } Rec(); } } } }