using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading;
using JetBrains.Annotations;
using JetBrains.Diagnostics;
using JetBrains.Serialization;
using JetBrains.Util;
using JetBrains.Util.Internal;
#nullable disable
namespace JetBrains.Threading
{
///
/// Circular auto expandable and shrinkable byte buffer based on linked list of arrays with two roles:
///
/// - Producers - fills buffer with method
/// - Consumer - process buffer in dedicated thread
///
///
///
public unsafe class ByteBufferAsyncProcessor
{
public enum StateKind
{
Initialized,
AsyncProcessing,
Stopping,
Terminating,
Terminated
}
public delegate void Processor(byte[] data, int offset, int len, ref long seqN);
class Chunk
{
internal readonly byte[] Data;
internal int Ptr;
internal Chunk Next;
internal long SeqN;
public Chunk(int chunkSize)
{
Data = new byte[chunkSize];
Reset();
}
public bool CheckEmpty(ByteBufferAsyncProcessor buffer)
{
if (Ptr == 0)
{
if (Mode.IsAssertion) Assertion.Assert(SeqN == long.MaxValue, "SeqN == long.MaxValue, but: {0}", SeqN);
return true;
}
if (buffer.AcknowledgedSeqN < SeqN)
return false;
Reset();
return true;
}
public bool IsNotProcessed => SeqN == long.MaxValue;
internal void Reset()
{
SeqN = long.MaxValue;
Ptr = 0;
}
}
private const string LogCategory = "ByteBufferAsyncProcessor";
[PublicAPI] public string Id { get; }
private readonly Processor myProcessor;
//to be done: more granular locking for better performance
private readonly object myLock = new object();
public long AcknowledgedSeqN { get; private set; }
private const int DefaultChunkSize = 16370; // some reserve for length + seqN
private readonly ILog myLog; //use with care because Logger use AsyncProcessor itself
[PublicAPI] public readonly int ChunkSize;
private const int DefaultShrinkIntervalMs = 30000; //30 sec
private int myLastShrinkOrGrowTimeMs;
[PublicAPI] public int ShrinkIntervalMs;
private bool myAllDataProcessed = true;
public bool AllDataProcessed
{
get
{
Memory.Barrier();
return myAllDataProcessed;
}
}
private readonly HashSet myPauseReasons = new HashSet();
public StateKind State { get; private set; }
private Chunk myChunkToFill;
private bool myProcessing;
private volatile Chunk myChunkToProcess;
private Thread myAsyncProcessingThread;
public ByteBufferAsyncProcessor(string id, Processor processor)
: this(id, DefaultChunkSize, processor)
{}
public ByteBufferAsyncProcessor(string id, int chunkSize, Processor processor)
{
Id = id;
myProcessor = processor;
ChunkSize = chunkSize;
ShrinkIntervalMs = DefaultShrinkIntervalMs;
myLog = Log.GetLog().GetSublogger(Id);
// var otherChunk = new Chunk(chunkSize);
// myChunkToFill = new Chunk(chunkSize) { Next = otherChunk };
// otherChunk.Next = myChunkToFill;
Reset(chunkSize);
State = StateKind.Initialized;
}
public int ChunkCount
{
get
{
lock (myLock)
{
var chunk = myChunkToFill.Next;
int res = 1;
while (chunk != myChunkToFill)
{
res++;
chunk = chunk.Next;
}
return res;
}
}
}
#region Helpers
private void CleanupInternal()
{
lock (myLock)
{
State = StateKind.Terminated;
myChunkToFill = null;
myChunkToProcess = null;
myAllDataProcessed = true;
}
}
private bool TerminateInternal(int timeoutMs, StateKind state, string action)
{
lock (myLock)
{
if (State == StateKind.Initialized)
{
LogLog.Verbose(LogCategory, "Can't {1} '{0}', because it hasn't been started yet", Id, action);
CleanupInternal();
return true;
}
if (State >= state)
{
LogLog.Verbose(LogCategory, "Trying to {2} async processor '{0}' but it's in state '{1}'", Id, State, action);
return true;
}
State = state;
Monitor.Pulse(myLock);
}
var res = myAsyncProcessingThread.Join(timeoutMs);
if (!res)
{
LogLog.Warn($"Async processor {Id} hasn't finished in ${timeoutMs} ms. Trying to abort thread.");
#if !NETCOREAPP
if (!RuntimeInfo.IsRunningOnCore)
LogLog.Catch(() => myAsyncProcessingThread.Abort());
#endif
}
CleanupInternal();
return res;
}
//since sometimes we terminate thread via Thread.Abort() it's quite normal to catch this abort
private void ThreadProcCatchAbort()
{
try
{
ThreadProc();
}
catch (ThreadAbortException e)
{
LogLog.Info($"ByteBufferProcessor {Id} was stopped by Thread.Abort rather than by normal Stop(): {0}", e);
#if !NETCOREAPP
Thread.ResetAbort();
#endif
}
}
#endregion
#region Processing
public void Acknowledge(long seqNumber)
{
lock (myLock)
{
LogLog.Trace(LogCategory, "New acknowledged seqN: {0}", seqNumber);
if (seqNumber > AcknowledgedSeqN)
{
AcknowledgedSeqN = seqNumber;
}
else
{
//it's ok ack came 2 times for same package, because if connection lost/resume client resend package with lower number and could receive packages with lower numbers
//throw new InvalidOperationException($"Acknowledge({seqNumber}) called, while next {nameof(seqNumber)} MUST BE greater than `{AcknowledgedSeqN}`");
}
}
}
public void ReprocessUnacknowledged()
{
Assertion.Require(Thread.CurrentThread != myAsyncProcessingThread);
lock (myLock)
{
while (myProcessing)
Monitor.Wait(myLock, 1);
var chunk = myChunkToFill.Next;
while (chunk != myChunkToFill)
{
if (!chunk.CheckEmpty(this))
{
//todo forbid acknowledges that could break processing
myChunkToProcess = chunk;
myAllDataProcessed = false;
Monitor.PulseAll(myLock);
return;
}
else
chunk = chunk.Next;
}
}
}
private void ThreadProc()
{
while (true)
{
lock (myLock)
{
if (State >= StateKind.Terminating) return;
while (myAllDataProcessed || myPauseReasons.Count > 0)
{
if (State >= StateKind.Stopping) return;
Monitor.Wait(myLock);
if (State >= StateKind.Terminating) return;
}
//In case of only put requests, we could write Assertion.Assert(chunk.Ptr > 0, "chunk.Ptr > 0");
//But in case of clear, we could get "Wait + Put(full) + Clear + Put" before this line and 'chunkToProcess' will point to empty chunk.
//RIDER-15223
while (myChunkToProcess.CheckEmpty(this)) //should never be endless, because `myAllDataProcessed` is 'false', that means that we MUST have ptr > 0 somewhere
myChunkToProcess = myChunkToProcess.Next;
if (myChunkToFill == myChunkToProcess)
{
//it's possible that next chuck is occupied by entry with seqN > acknowledgedSeqN
GrowConditionally();
myChunkToFill = myChunkToProcess.Next;
}
ShrinkConditionally(myChunkToProcess);
if (Mode.IsAssertion) Assertion.Assert(myChunkToProcess.Ptr > 0, "chunkToProcess.Ptr > 0");
if (Mode.IsAssertion) Assertion.Assert(myChunkToFill != myChunkToProcess && myChunkToFill.IsNotProcessed, "myChunkToFill != chunkToProcess && myChunkToFill.IsNotProcessed");
myProcessing = true;
}
long seqN = myChunkToProcess.IsNotProcessed ? 0 : myChunkToProcess.SeqN;
try
{
myProcessor(myChunkToProcess.Data, 0, myChunkToProcess.Ptr, ref seqN);
}
catch (Exception e)
{
LogLog.Error(e);
}
finally
{
lock (myLock)
{
myProcessing = false;
if (myChunkToProcess == null)
{
LogLog.Error($"{nameof(myChunkToProcess)} is null. State: {State}");
}
else
{
myChunkToProcess.SeqN = seqN;
myChunkToProcess = myChunkToProcess.Next;
// Assertion.Assert(myChunkToProcess.IsNotProcessed, "chunkToProcess.IsNotProcessed"); not true in case of reprocessing
if (myChunkToProcess.Ptr == 0)
myAllDataProcessed = true;
}
}
}
}
}
#endregion
#region State changing API
///
/// Starts async processing of queue.
///
public void Start()
{
lock (myLock)
{
if (State != StateKind.Initialized)
{
LogLog.Verbose(LogCategory, "Trying to START async processor '{0}' but it's in state '{1}'", Id, State);
return;
}
State = StateKind.AsyncProcessing;
myAsyncProcessingThread = new Thread(ThreadProcCatchAbort) { Name = Id, IsBackground = true};
myAsyncProcessingThread.Start();
}
}
private void Reset(int chunkSize)
{
myChunkToFill = new Chunk(chunkSize);
myChunkToFill.Next = myChunkToFill;
myLastShrinkOrGrowTimeMs = Environment.TickCount;
myChunkToProcess = myChunkToFill;
}
public void Clear()
{
Assertion.Require(Thread.CurrentThread != myAsyncProcessingThread);
lock (myLock)
{
LogLog.Verbose(LogCategory, "Cleaning '{0}', state={1}", Id, State);
if (State >= StateKind.Stopping) return;
WaitProcessingFinished();
Reset(ChunkSize);
myAllDataProcessed = true;
}
}
public bool Pause([NotNull] string reason)
{
if (reason == null) throw new ArgumentNullException(nameof(reason));
lock (myLock)
{
if (State >= StateKind.Stopping) return false;
var newReasonAdded = myPauseReasons.Add(reason);
myLog.Verbose("PAUSE ('{0}') {1}:: state={2}", reason, newReasonAdded ? "": "", State);
WaitProcessingFinished();
return newReasonAdded;
}
}
private void WaitProcessingFinished()
{
if (Thread.CurrentThread == myAsyncProcessingThread) return; //don't want to deadlock
while (myProcessing)
Monitor.Wait(myLock, 1);
}
public bool Resume([NotNull] string reason)
{
if (reason == null) throw new ArgumentNullException(nameof(reason));
lock (myLock)
{
var present = myPauseReasons.Remove(reason);
var unpaused = myPauseReasons.Count == 0;
myLog.Verbose((unpaused ? "RESUME" : $"Remove pause reason('{reason}')") + $" :: state={State}");
Monitor.PulseAll(myLock);
return present;
}
}
///
/// Graceful stop. Process queue, but doesn't accept new data via . Joins processing thread for given timeout. If timeout elapsed, aborts thread.
///
/// Timeout to wait. for infinite waiting.
/// 'true' if Join(timeoutMs) was successful, false otherwise. Also returns 'false' if thread is already stopped or killed.">
public bool Stop(int timeoutMs = Timeout.Infinite)
{
return TerminateInternal(timeoutMs, StateKind.Stopping, "STOP");
}
///
/// Force stop. Doesn't process queue, doesn't accept new data via . Joins processing thread for given timeout. If timeout elapsed, aborts thread.
///
/// Timeout to wait. for infinite waiting.
/// 'true' if Join(timeoutMs) was successful, false otherwise. Also returns 'false' if thread is already stopped or killed.">
public bool Terminate(int timeoutMs = Timeout.Infinite)
{
return TerminateInternal(timeoutMs, StateKind.Terminating, "TERMINATE");
}
#endregion
#region Queue filling API
[PublicAPI] public void Put(byte[] data)
{
fixed (byte* ptr = data)
{
Put(ptr, data.Length);
}
}
[PublicAPI] public void Put(UnsafeWriter.Cookie data)
{
Put(data.Data, data.Count);
}
#if !NETCOREAPP
[System.Runtime.ExceptionServices.HandleProcessCorruptedStateExceptions]
#endif // to force myLock to be unlocked even in case of corrupted state exception
[PublicAPI] public void Put(byte* start, int count)
{
if (count <= 0) return;
lock (myLock)
{
if (State >= StateKind.Stopping) return;
// //reentrancy guard
// if (myAsyncProcessingThread == Thread.CurrentThread)
// {
// byte[] instantChunk = new byte[count];
// Marshal.Copy((IntPtr)start, instantChunk, 0, count);
// myProcessor(instantChunk, 0, count);
// return;
// }
var ptr = 0;
while (ptr < count)
{
if (Mode.IsAssertion) Assertion.Assert(myChunkToFill.IsNotProcessed);
var rest = count - ptr;
var available = ChunkSize - myChunkToFill.Ptr;
if (available > 0)
{
var copylen = Math.Min(rest, available);
Marshal.Copy((IntPtr)(start + ptr), myChunkToFill.Data, myChunkToFill.Ptr, copylen);
myChunkToFill.Ptr += copylen;
ptr += copylen;
}
else
{
GrowConditionally();
myChunkToFill = myChunkToFill.Next;
}
}
if (myAllDataProcessed) //speedup
{
myAllDataProcessed = false;
Monitor.Pulse(myLock);
}
}
}
private void GrowConditionally() //under lock
{
if (myChunkToFill.Next.CheckEmpty(this)) return;
LogLog.Trace(LogCategory, "Grow: {0} bytes", ChunkSize);
myChunkToFill.Next = new Chunk(ChunkSize) { Next = myChunkToFill.Next };
myLastShrinkOrGrowTimeMs = Environment.TickCount;
}
private void ShrinkConditionally(Chunk upTo) //under lock
{
if (Mode.IsAssertion) Assertion.Assert(myChunkToFill != upTo, "myFreeChunk != upTo");
var now = Environment.TickCount;
if (now - myLastShrinkOrGrowTimeMs <= ShrinkIntervalMs && /*overflow*/now - myLastShrinkOrGrowTimeMs >= 0 ) return;
myLastShrinkOrGrowTimeMs = now;
while (true)
{
var toRemove = myChunkToFill.Next;
if (toRemove == upTo || !toRemove.CheckEmpty(this))
break;
LogLog.Trace(LogCategory, "Shrink: {0} bytes, seqN: {1}", ChunkSize, toRemove);
myChunkToFill.Next = toRemove.Next;
}
}
#endregion
}
}