in rd-net/Lifetimes/Threading/ByteBufferAsyncProcessor.cs [271:339]
private void ThreadProc()
{
while (true)
{
lock (myLock)
{
if (State >= StateKind.Terminating) return;
while (myAllDataProcessed || myPauseReasons.Count > 0)
{
if (State >= StateKind.Stopping) return;
Monitor.Wait(myLock);
if (State >= StateKind.Terminating) return;
}
//In case of only put requests, we could write Assertion.Assert(chunk.Ptr > 0, "chunk.Ptr > 0");
//But in case of clear, we could get "Wait + Put(full) + Clear + Put" before this line and 'chunkToProcess' will point to empty chunk.
//RIDER-15223
while (myChunkToProcess.CheckEmpty(this)) //should never be endless, because `myAllDataProcessed` is 'false', that means that we MUST have ptr > 0 somewhere
myChunkToProcess = myChunkToProcess.Next;
if (myChunkToFill == myChunkToProcess)
{
//it's possible that next chuck is occupied by entry with seqN > acknowledgedSeqN
GrowConditionally();
myChunkToFill = myChunkToProcess.Next;
}
ShrinkConditionally(myChunkToProcess);
if (Mode.IsAssertion) Assertion.Assert(myChunkToProcess.Ptr > 0, "chunkToProcess.Ptr > 0");
if (Mode.IsAssertion) Assertion.Assert(myChunkToFill != myChunkToProcess && myChunkToFill.IsNotProcessed, "myChunkToFill != chunkToProcess && myChunkToFill.IsNotProcessed");
myProcessing = true;
}
long seqN = myChunkToProcess.IsNotProcessed ? 0 : myChunkToProcess.SeqN;
try
{
myProcessor(myChunkToProcess.Data, 0, myChunkToProcess.Ptr, ref seqN);
}
catch (Exception e)
{
LogLog.Error(e);
}
finally
{
lock (myLock)
{
myProcessing = false;
if (myChunkToProcess == null)
{
LogLog.Error($"{nameof(myChunkToProcess)} is null. State: {State}");
}
else
{
myChunkToProcess.SeqN = seqN;
myChunkToProcess = myChunkToProcess.Next;
// Assertion.Assert(myChunkToProcess.IsNotProcessed, "chunkToProcess.IsNotProcessed"); not true in case of reprocessing
if (myChunkToProcess.Ptr == 0)
myAllDataProcessed = true;
}
}
}
}
}