rd-net/Test.RdFramework/SocketWireTest.cs (395 lines of code) (raw):

using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Net.Sockets; using System.Threading; using JetBrains.Collections.Viewable; using JetBrains.Core; using JetBrains.Diagnostics; using JetBrains.Diagnostics.Internal; using JetBrains.Lifetimes; using JetBrains.Rd; using JetBrains.Rd.Base; using JetBrains.Rd.Impl; using JetBrains.Threading; using NUnit.Framework; using Test.Lifetimes; namespace Test.RdFramework { [TestFixture] public class SocketWireTest : LifetimesTestBase { internal static TimeSpan DefaultTimeout = TimeSpan.FromMilliseconds(100); internal const string Top = "top"; private void WaitAndAssert<T>(RdProperty<T> property, T expected, T prev) { WaitAndAssert(property, expected, new Maybe<T>(prev)); } private void WaitAndAssert<T>(RdProperty<T> property, T expected, Maybe<T> prev = default(Maybe<T>)) { var start = Environment.TickCount; const int timeout = 5000; while (Environment.TickCount - start < timeout && property.Maybe == prev) Thread.Sleep(10); if (property.Maybe == prev) throw new TimeoutException($"Timeout {timeout} ms while waiting for value '{expected}'"); Assert.AreEqual(expected, property.Value); } static int FindFreePort() { TcpListener l = new TcpListener(IPAddress.Loopback, 0); l.Start(); int port = ((IPEndPoint) l.LocalEndpoint).Port; l.Stop(); return port; } internal static IProtocol Server(Lifetime lifetime, int? port = null) { var id = "TestServer"; var server = new SocketWire.Server(lifetime, SynchronousScheduler.Instance, new IPEndPoint(IPAddress.Loopback, port ?? 0), id); return new Protocol(id, new Serializers(), new Identities(IdKind.Server), SynchronousScheduler.Instance, server, lifetime); } internal static IProtocol Client(Lifetime lifetime, int port) { var id = "TestClient"; var client = new SocketWire.Client(lifetime, SynchronousScheduler.Instance, port, id); return new Protocol(id, new Serializers(), new Identities(IdKind.Server), SynchronousScheduler.Instance, client, lifetime); } internal static IProtocol Client(Lifetime lifetime, IProtocol serverProtocol) { // ReSharper disable once PossibleNullReferenceException return Client(lifetime, (serverProtocol.Wire as SocketWire.Server).Port); } [Test] public void TestBasicRun() { Lifetime.Using(lifetime => { SynchronousScheduler.Instance.SetActive(lifetime); var serverProtocol = Server(lifetime); var clientProtocol = Client(lifetime, serverProtocol); var sp = NewRdProperty<int>().Static(1); sp.BindTopLevel(lifetime, serverProtocol, Top); var cp = NewRdProperty<int>().Static(1); cp.BindTopLevel(lifetime, clientProtocol, Top); cp.SetValue(1); WaitAndAssert(sp, 1); }); } [Test] public void TestOrdering() { Lifetime.Using(lifetime => { SynchronousScheduler.Instance.SetActive(lifetime); var serverProtocol = Server(lifetime); var clientProtocol = Client(lifetime, serverProtocol); var sp = NewRdProperty<int>().Static(1); sp.BindTopLevel(lifetime, serverProtocol, Top); var cp = NewRdProperty<int>().Static(1); cp.BindTopLevel(lifetime, clientProtocol, Top); var log = new List<int>(); sp.Advise(lifetime, it => log.Add(it)); sp.SetValue(1); sp.SetValue(2); sp.SetValue(3); sp.SetValue(4); sp.SetValue(5); while (log.Count < 5) Thread.Sleep(10); CollectionAssert.AreEqual(new[] {1, 2, 3, 4, 5}, log); }); } [Test] public void TestBigBuffer() { Lifetime.Using(lifetime => { SynchronousScheduler.Instance.SetActive(lifetime); var serverProtocol = Server(lifetime); var clientProtocol = Client(lifetime, serverProtocol); var sp = NewRdProperty<string>().Static(1); sp.BindTopLevel(lifetime, serverProtocol, Top); var cp = NewRdProperty<string>().Static(1); cp.BindTopLevel(lifetime, clientProtocol, Top); cp.SetValue("1"); WaitAndAssert(sp, "1"); sp.SetValue(new string('a', 100000)); WaitAndAssert(cp, new string('a', 100000), "1"); cp.SetValue("a"); WaitAndAssert(sp, "a", new string('a', 100000)); cp.SetValue("ab"); WaitAndAssert(sp, "ab", "a"); cp.SetValue("abc"); WaitAndAssert(sp, "abc", "ab"); }); } [Test] public void TestRunWithSlowpokeServer() { Lifetime.Using(lifetime => { SynchronousScheduler.Instance.SetActive(lifetime); var port = FindFreePort(); var clientProtocol = Client(lifetime, port); var cp = NewRdProperty<int>().Static(1); cp.BindTopLevel(lifetime, clientProtocol, Top); cp.SetValue(1); Thread.Sleep(2000); var serverProtocol = Server(lifetime, port); var sp = NewRdProperty<int>().Static(1); sp.BindTopLevel(lifetime, serverProtocol, Top); var prev = sp.Maybe; cp.SetValue(4); Thread.Sleep(200); WaitAndAssert(sp, 4, prev); }); } [Test] [Timeout(5000)] public void TestServerWithoutClient() { Lifetime.Using(lifetime => { WithLongTimeout(lifetime); SynchronousScheduler.Instance.SetActive(lifetime); Server(lifetime); }); } [Test] public void TestServerWithoutClientWithDelay() { Lifetime.Using(lifetime => { SynchronousScheduler.Instance.SetActive(lifetime); Server(lifetime); Thread.Sleep(100); }); } [Test] public void TestServerWithoutClientWithDelayAndMessages() { Lifetime.Using(lifetime => { SynchronousScheduler.Instance.SetActive(lifetime); var protocol = Server(lifetime); Thread.Sleep(100); var p = NewRdProperty<int>().Static(1); p.BindTopLevel(lifetime, protocol, Top); p.SetValue(1); p.SetValue(2); Thread.Sleep(50); }); } [Test] [Timeout(5000)] public void TestClientWithoutServer() { Lifetime.Using(lifetime => { WithLongTimeout(lifetime); SynchronousScheduler.Instance.SetActive(lifetime); Client(lifetime, FindFreePort()); }); } [Test] public void TestClientWithoutServerWithDelay() { Lifetime.Using(lifetime => { SynchronousScheduler.Instance.SetActive(lifetime); Client(lifetime, FindFreePort()); Thread.Sleep(100); }); } [Test] public void TestClientWithoutServerWithDelayAndMessages() { Lifetime.Using(lifetime => { SynchronousScheduler.Instance.SetActive(lifetime); var protocol = Client(lifetime, FindFreePort()); Thread.Sleep(100); var p = NewRdProperty<int>().Static(1); p.BindTopLevel(lifetime, protocol, Top); p.SetValue(1); p.SetValue(2); Thread.Sleep(50); }); } [Test, Ignore("https://github.com/JetBrains/rd/issues/69")] public void TestDisconnect() => TestDisconnectBase((list, i) => list.Add(i)); [Test] public void TestDisconnect_AllowDuplicates() => TestDisconnectBase((list, i) => { // values may be duplicated due to asynchronous acknowledgement if (list.LastOrDefault() < i) list.Add(i); }); private void TestDisconnectBase(Action<List<int>, int> advise) { var timeout = TimeSpan.FromSeconds(1); Lifetime.Using(lifetime => { SynchronousScheduler.Instance.SetActive(lifetime); var serverProtocol = Server(lifetime); var clientProtocol = Client(lifetime, serverProtocol); var sp = NewRdSignal<int>().Static(1); sp.BindTopLevel(lifetime, serverProtocol, Top); var cp = NewRdSignal<int>().Static(1); cp.BindTopLevel(lifetime, clientProtocol, Top); var log = new List<int>(); sp.Advise(lifetime, i => advise(log, i)); cp.Fire(1); cp.Fire(2); Assert.True(SpinWaitEx.SpinUntil(timeout, () => log.Count == 2)); Assert.AreEqual(new List<int> {1, 2}, log); CloseSocket(clientProtocol); cp.Fire(3); cp.Fire(4); Assert.True(SpinWaitEx.SpinUntil(timeout, () => log.Count == 4)); Assert.AreEqual(new List<int> {1, 2, 3, 4}, log); CloseSocket(serverProtocol); cp.Fire(5); cp.Fire(6); Assert.True(SpinWaitEx.SpinUntil(timeout, () => log.Count == 6)); Assert.AreEqual(new List<int> {1, 2, 3, 4, 5, 6}, log); }); } [Test] public void TestReconnect() { Lifetime.Using(lifetime => { SynchronousScheduler.Instance.SetActive(lifetime); var serverProtocol = Server(lifetime, null); var sp = NewRdProperty<int>().Static(1); sp.BindTopLevel(lifetime, serverProtocol, Top); sp.IsMaster = false; var wire = serverProtocol.Wire as SocketWire.Base; int clientCount = 0; wire.NotNull().Connected.WhenTrue(lifetime, _ => { clientCount++; }); Assert.AreEqual(0, clientCount); Lifetime.Using(lf => { var clientProtocol = Client(lf, serverProtocol); var cp = NewRdProperty<int>().Static(1); cp.IsMaster = true; cp.BindTopLevel(lf, clientProtocol, Top); cp.SetValue(1); WaitAndAssert(sp, 1); Assert.AreEqual(1, clientCount); }); Lifetime.Using(lf => { sp = NewRdProperty<int>().Static(2); sp.BindTopLevel(lifetime, serverProtocol, Top); var clientProtocol = Client(lf, serverProtocol); var cp = NewRdProperty<int>().Static(2); cp.BindTopLevel(lf, clientProtocol, Top); cp.SetValue(2); WaitAndAssert(sp, 2); Assert.AreEqual(2, clientCount); }); Lifetime.Using(lf => { var clientProtocol = Client(lf, serverProtocol); var cp = NewRdProperty<int>().Static(2); cp.BindTopLevel(lf, clientProtocol, Top); cp.SetValue(3); WaitAndAssert(sp, 3, 2); Assert.AreEqual(3, clientCount); }); }); } [TestCase(true)] [TestCase(false)] public void TestPacketLoss(bool isClientToServer) { using (Log.UsingLogFactory(new TextWriterLogFactory(Console.Out, LoggingLevel.TRACE))) Lifetime.Using(lifetime => { SynchronousScheduler.Instance.SetActive(lifetime); var serverProtocol = Server(lifetime); var serverWire = (SocketWire.Base) serverProtocol.Wire; var proxy = new SocketProxy("TestProxy", lifetime, serverProtocol); proxy.Start(); var clientProtocol = Client(lifetime, proxy.Port); var clientWire = (SocketWire.Base) clientProtocol.Wire; Thread.Sleep(DefaultTimeout); if (isClientToServer) proxy.StopClientToServerMessaging(); else proxy.StopServerToClientMessaging(); var detectionTimeoutTicks = ((SocketWire.Base) clientProtocol.Wire).HeartBeatInterval.Ticks * (SocketWire.Base.MaximumHeartbeatDelay + 3); var detectionTimeout = TimeSpan.FromTicks(detectionTimeoutTicks); Thread.Sleep(detectionTimeout); Assert.IsTrue(serverWire.Connected.Value); Assert.IsTrue(clientWire.Connected.Value); Assert.IsFalse(serverWire.HeartbeatAlive.Value); Assert.IsFalse(clientWire.HeartbeatAlive.Value); if (isClientToServer) proxy.StartClientToServerMessaging(); else proxy.StartServerToClientMessaging(); Thread.Sleep(detectionTimeout); Assert.IsTrue(serverWire.Connected.Value); Assert.IsTrue(clientWire.Connected.Value); Assert.IsTrue(serverWire.HeartbeatAlive.Value); Assert.IsTrue(clientWire.HeartbeatAlive.Value); }); } [Test] [Ignore("Not enough timeout to get the correct test")] public void TestStressHeartbeat() { // using (Log.UsingLogFactory(new TextWriterLogFactory(Console.Out, LoggingLevel.TRACE))) Lifetime.Using(lifetime => { SynchronousScheduler.Instance.SetActive(lifetime); var interval = TimeSpan.FromMilliseconds(50); var serverProtocol = Server(lifetime); var serverWire = ((SocketWire.Base) serverProtocol.Wire).With(wire => wire.HeartBeatInterval = interval); var latency = TimeSpan.FromMilliseconds(40); var proxy = new SocketProxy("TestProxy", lifetime, serverProtocol) {Latency = latency}; proxy.Start(); var clientProtocol = Client(lifetime, proxy.Port); var clientWire = ((SocketWire.Base) clientProtocol.Wire).With(wire => wire.HeartBeatInterval = interval); Thread.Sleep(DefaultTimeout); serverWire.HeartbeatAlive.WhenFalse(lifetime, _ => Assert.Fail("Detected false disconnect on server side")); clientWire.HeartbeatAlive.WhenFalse(lifetime, _ => Assert.Fail("Detected false disconnect on client side")); Thread.Sleep(TimeSpan.FromSeconds(50)); }); } [Test] public void TestSocketFactory() { var sLifetime = new LifetimeDefinition(); var factory = new SocketWire.ServerFactory(sLifetime.Lifetime, SynchronousScheduler.Instance); var lf1 = new LifetimeDefinition(); new SocketWire.Client(lf1.Lifetime, SynchronousScheduler.Instance, factory.LocalPort); SpinWaitEx.SpinUntil(() => factory.Connected.Count == 1); var lf2 = new LifetimeDefinition(); new SocketWire.Client(lf2.Lifetime, SynchronousScheduler.Instance, factory.LocalPort); SpinWaitEx.SpinUntil(() => factory.Connected.Count == 2); lf1.Terminate(); SpinWaitEx.SpinUntil(() => factory.Connected.Count == 1); sLifetime.Terminate(); SpinWaitEx.SpinUntil(() => factory.Connected.Count == 0); } private static void CloseSocket(IProtocol protocol) { if (!(protocol.Wire is SocketWire.Base socketWire)) { Assert.Fail(); return; } SocketWire.Base.CloseSocket(socketWire.Socket.NotNull()); } private static void WithLongTimeout(Lifetime lifetime) { var oldValue = SocketWire.Base.TimeoutMs; lifetime.Bracket(() => SocketWire.Base.TimeoutMs = 100_000, () => SocketWire.Base.TimeoutMs = oldValue); } } }