rd-net/Test.Lifetimes/Threading/ByteBufferAsyncProcessorTest.cs (203 lines of code) (raw):
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Diagnostics;
using JetBrains.Serialization;
using JetBrains.Threading;
using NUnit.Framework;
namespace Test.Lifetimes.Threading
{
[TestFixture]
public class ByteBufferAsyncProcessorTest : LifetimesTestBase
{
[Test]
public void TestOneProducer()
{
var processed = 0;
var buffer = new ByteBufferAsyncProcessor("TestAsyncProcessor", 10, delegate(byte[] data, int offset, int len, ref long seqN)
{
Assert.Greater(len, 0);
for (int i = 0; i < len - 1; i++)
{
Assert.AreEqual(data[offset + i+1], (byte)(data[offset + i]+1));
}
processed += len;
});
byte b = 0;
int l = 0;
var toProcess = 0;
for (int i = 0; i < 300; i++)
{
toProcess += l;
byte[] p = new byte[l++];
for (int j = 0; j < p.Length; j++)
{
p[j] = b++;
}
buffer.Put(p);
if (i == 20)
{
buffer.Start(); //testing delayed start
}
if (i > 0 && i % 50 == 0)
{
SpinWaitEx.SpinUntil(() => buffer.AllDataProcessed); //give it to process
}
}
SpinWaitEx.SpinUntil(() => buffer.AllDataProcessed); //give it to process
Assert.AreEqual(toProcess, processed);
Assert.True(buffer.Stop(1000));
}
[Test]
public void TestClean()
{
int x = 0;
var buffer = new ByteBufferAsyncProcessor("TestAsyncProcessor", 1, delegate(byte[] data, int offset, int len, ref long seqN) { x += data[offset]; });
buffer.Put(new byte[]{1});
buffer.Put(new byte[]{2});
buffer.Put(new byte[]{3});
Assert.False(buffer.AllDataProcessed); //not started
buffer.Clear();
Assert.True(buffer.AllDataProcessed);
buffer.Start();
buffer.Put(new byte[]{1, 2, 3});
SpinWaitEx.SpinUntil(() => buffer.AllDataProcessed);
Assert.AreEqual(6, x);
}
[Test]
public void TestPause()
{
int x = 0;
var buffer = new ByteBufferAsyncProcessor("TestAsyncProcessor", 1, delegate(byte[] data, int offset, int len, ref long seqN) { x += data[offset]; });
var reason1 = "reason1";
var reason2 = "reason2";
Assert.True(buffer.Pause(reason1));
Assert.False(buffer.Pause(reason1));
Assert.True(buffer.Pause(reason2));
Assert.False(buffer.Pause(reason2));
buffer.Start();
buffer.Put(new byte[]{1, 2, 3});
Thread.Sleep(50);
Assert.AreEqual(0, x);
Assert.True(buffer.Resume(reason1));
Assert.False(buffer.Resume(reason1));
Thread.Sleep(50);
Assert.AreEqual(0, x);
Assert.True(buffer.Resume(reason2));
Assert.False(buffer.Resume(reason2));
Assert.False(buffer.Resume(reason2));
SpinWaitEx.SpinUntil(() => buffer.AllDataProcessed);
Assert.AreEqual(6, x);
}
[Test]
public unsafe void StressTestWithAck()
{
// LogLog.SeverityFilter = LoggingLevel.VERBOSE;
// LogLog.RecordsChanged += record => { Console.WriteLine(record.Format(true)); };
long prev = 0;
ByteBufferAsyncProcessor buffer = null;
buffer = new ByteBufferAsyncProcessor("TestAsyncProcessor", 8,
delegate(byte[] data, int offset, int len, ref long seqN)
{
long l = 0;
Log.Root.Catch(() =>
{
fixed (byte* b = data)
{
l = UnsafeReader.CreateReader(b, 8).ReadLong();
Assert.True(l > prev);
prev = l;
if (l % 1 == 0)
Ack(l);
}
});
seqN = l;
});
buffer.ShrinkIntervalMs = 10;
buffer.Start();
void Ack(long seqn)
{
buffer?.Acknowledge(seqn);
}
var start = Environment.TickCount;
bool Until() => Environment.TickCount - start < 1000;
long next = 0;
var tasks = new List<Task>();
for (var index = 0; index < 4; index++)
{
tasks.Add(Task.Run(() =>
{
var rnd = new Random();
while (Until())
{
lock (tasks)
{
using (var cookie = UnsafeWriter.NewThreadLocalWriter())
{
cookie.Writer.WriteInt64(++next);
buffer.Put(cookie);
}
}
if (rnd.Next(1000) < 1) Thread.Sleep(1);
if (rnd.Next(1000) < 5)
buffer.Clear();
}
}));
}
Task.WaitAll(tasks.ToArray());
// Console.WriteLine(next);
// Console.WriteLine(buffer.ChunkCount);
}
[Test]
public unsafe void TestReprocess()
{
long prev = 0;
ByteBufferAsyncProcessor buffer = null;
List<long> log = new List<long>();
buffer = new ByteBufferAsyncProcessor("TestAsyncProcessor", 8,
delegate(byte[] data, int offset, int len, ref long seqN)
{
try
{
fixed (byte* b = data)
{
long l = UnsafeReader.CreateReader(b, 8).ReadLong();
if (seqN != 0)
Assert.AreEqual(l, seqN);
seqN = l;
log.Add(l);
Assert.True(l > prev);
prev = l;
}
}
catch (Exception e)
{
Log.Root.Error(e);
}
});
buffer.ShrinkIntervalMs = 10;
buffer.Start();
PutLong(buffer, 1);
PutLong(buffer, 2);
PutLong(buffer, 3);
PutLong(buffer, 4);
SpinWaitEx.SpinUntil(() => buffer.AllDataProcessed);
Assert.AreEqual(new List<int> {1, 2, 3, 4}, log);
buffer.Acknowledge(2);
prev = 2;
buffer.ReprocessUnacknowledged();
PutLong(buffer, 5);
SpinWaitEx.SpinUntil(() => buffer.AllDataProcessed);
Assert.AreEqual(new List<int> {1, 2, 3, 4, 3, 4, 5}, log);
buffer.Acknowledge(5);
buffer.ReprocessUnacknowledged();
SpinWaitEx.SpinUntil(() => buffer.AllDataProcessed);
Assert.AreEqual(new List<int> {1, 2, 3, 4, 3, 4, 5}, log);
}
private void PutLong(ByteBufferAsyncProcessor buffer, long l)
{
using (var cookie = UnsafeWriter.NewThreadLocalWriter())
{
cookie.Writer.WriteInt64(l);
buffer.Put(cookie);
}
}
}
}