in rd-net/Test.Lifetimes/Threading/ByteBufferAsyncProcessorTest.cs [114:181]
public unsafe void StressTestWithAck()
{
// LogLog.SeverityFilter = LoggingLevel.VERBOSE;
// LogLog.RecordsChanged += record => { Console.WriteLine(record.Format(true)); };
long prev = 0;
ByteBufferAsyncProcessor buffer = null;
buffer = new ByteBufferAsyncProcessor("TestAsyncProcessor", 8,
delegate(byte[] data, int offset, int len, ref long seqN)
{
long l = 0;
Log.Root.Catch(() =>
{
fixed (byte* b = data)
{
l = UnsafeReader.CreateReader(b, 8).ReadLong();
Assert.True(l > prev);
prev = l;
if (l % 1 == 0)
Ack(l);
}
});
seqN = l;
});
buffer.ShrinkIntervalMs = 10;
buffer.Start();
void Ack(long seqn)
{
buffer?.Acknowledge(seqn);
}
var start = Environment.TickCount;
bool Until() => Environment.TickCount - start < 1000;
long next = 0;
var tasks = new List<Task>();
for (var index = 0; index < 4; index++)
{
tasks.Add(Task.Run(() =>
{
var rnd = new Random();
while (Until())
{
lock (tasks)
{
using (var cookie = UnsafeWriter.NewThreadLocalWriter())
{
cookie.Writer.WriteInt64(++next);
buffer.Put(cookie);
}
}
if (rnd.Next(1000) < 1) Thread.Sleep(1);
if (rnd.Next(1000) < 5)
buffer.Clear();
}
}));
}
Task.WaitAll(tasks.ToArray());
// Console.WriteLine(next);
// Console.WriteLine(buffer.ChunkCount);
}