public unsafe void StressTestWithAck()

in rd-net/Test.Lifetimes/Threading/ByteBufferAsyncProcessorTest.cs [114:181]


    public unsafe void StressTestWithAck()
    {
//      LogLog.SeverityFilter = LoggingLevel.VERBOSE;
//      LogLog.RecordsChanged += record => { Console.WriteLine(record.Format(true)); };

      long prev = 0;
      ByteBufferAsyncProcessor buffer = null;
      buffer = new ByteBufferAsyncProcessor("TestAsyncProcessor", 8,
        delegate(byte[] data, int offset, int len, ref long seqN)
        {
          long l = 0;
          Log.Root.Catch(() =>
          {
            fixed (byte* b = data)
            {
              l = UnsafeReader.CreateReader(b, 8).ReadLong();
              Assert.True(l > prev);
              prev = l;
              if (l % 1 == 0)
                Ack(l);

            }
          });
          seqN = l;
        });
      buffer.ShrinkIntervalMs = 10;
      buffer.Start();

      void Ack(long seqn)
      {
        buffer?.Acknowledge(seqn);
      }

      var start = Environment.TickCount;

      bool Until() => Environment.TickCount - start < 1000;

      long next = 0;
      var tasks = new List<Task>();

      for (var index = 0; index < 4; index++)
      {
        tasks.Add(Task.Run(() =>
        {
          var rnd = new Random();

          while (Until())
          {
            lock (tasks)
            {
              using (var cookie = UnsafeWriter.NewThreadLocalWriter())
              {
                cookie.Writer.WriteInt64(++next);
                buffer.Put(cookie);
              }
            }

            if (rnd.Next(1000) < 1) Thread.Sleep(1);
            if (rnd.Next(1000) < 5)
              buffer.Clear();
          }
        }));
      }

      Task.WaitAll(tasks.ToArray());
//      Console.WriteLine(next);
//      Console.WriteLine(buffer.ChunkCount);
    }