AdlsDotNetSDK/QueueTools/PriorityQueueWrapper.cs (60 lines of code) (raw):
using System;
using System.Threading;
namespace Microsoft.Azure.DataLake.Store.QueueTools
{
internal class PriorityQueueWrapper<T> where T : IComparable
{
private readonly PriorityQueue<T> _queue;
private readonly int _totalThreads;
private int _waitingThreads = 0;
internal PriorityQueueWrapper(int numThreads):this()
{
_totalThreads = numThreads;
}
internal PriorityQueueWrapper()
{
_queue = new PriorityQueue<T>();
_totalThreads = -1;
}
internal PriorityQueueWrapper(int capacity,int numThreads)
{
_queue = new PriorityQueue<T>(capacity);
_totalThreads = numThreads;
}
internal int Size()
{
lock (_queue)
{
return _queue.HeapSize;
}
}
internal void Add(T job)
{
lock (_queue)
{
_queue.Add(job);
Monitor.Pulse(_queue);
}
}
internal T Poll()
{
lock (_queue)
{
if (_queue.HeapSize <= 0)
{
_waitingThreads++;
if (_waitingThreads == _totalThreads) //All threads are waiting
{
return default(T);
}
while (_queue.HeapSize <= 0)
{
Monitor.Wait(_queue);
}
_waitingThreads--;
}
return _queue.GetMax();
}
}
}
}