AdlsDotNetSDK/QueueTools/QueueWrapper.cs (45 lines of code) (raw):
using System.Collections.Generic;
using System.Threading;
namespace Microsoft.Azure.DataLake.Store.QueueTools
{
internal class QueueWrapper<T> where T:class
{
private readonly Queue<T> _dirList;
private const int InitialCapacity = 10240;
private int _isWaiting;
private int TotalThreads { get; }
internal QueueWrapper(int numThreads)
{
TotalThreads = numThreads;
_dirList = new Queue<T>(InitialCapacity);
}
internal void Add(T dir)
{
lock (_dirList)
{
_dirList.Enqueue(dir);
Monitor.Pulse(_dirList);
}
}
internal T Poll()
{
lock (_dirList)
{
if (_dirList.Count == 0)
{
_isWaiting++;
if (_isWaiting == TotalThreads) //All threads are waiting
{
return null;
}
while (_dirList.Count == 0)
{
Monitor.Wait(_dirList);
}
_isWaiting--;
}
return _dirList.Dequeue();
}
}
}
}