iothub/service/src/Common/InternalBufferManager.cs (519 lines of code) (raw):
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.Devices.Common
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Runtime.InteropServices;
using System.Collections.Concurrent;
internal abstract class InternalBufferManager
{
protected InternalBufferManager()
{
}
public abstract byte[] TakeBuffer(int bufferSize);
public abstract void ReturnBuffer(byte[] buffer);
public abstract void Clear();
public static InternalBufferManager Create(long maxBufferPoolSize, int maxBufferSize, bool isTransportBufferPool)
{
if (maxBufferPoolSize == 0)
{
return GCBufferManager.Value;
}
else
{
Fx.Assert(maxBufferPoolSize > 0 && maxBufferSize >= 0, "bad params, caller should verify");
if (isTransportBufferPool)
{
return new PreallocatedBufferManager(maxBufferPoolSize, maxBufferSize);
}
else
{
return new PooledBufferManager(maxBufferPoolSize, maxBufferSize);
}
}
}
public static byte[] AllocateByteArray(int size)
{
// This will be inlined in retail bits but provides a
// common entry point for debugging all buffer allocations
// and can be instrumented if necessary.
return new byte[size];
}
private class PreallocatedBufferManager : InternalBufferManager
{
private int maxBufferSize;
private int medBufferSize;
private int smallBufferSize;
private byte[][] buffersList;
private GCHandle[] handles;
private ConcurrentStack<byte[]> freeSmallBuffers;
private ConcurrentStack<byte[]> freeMedianBuffers;
private ConcurrentStack<byte[]> freeLargeBuffers;
internal PreallocatedBufferManager(long maxMemoryToPool, int maxBufferSize)
{
// default values: maxMemoryToPool = 48MB, maxBufferSize = 64KB
// This creates the following buffers:
// max: 64KB = 256, med 16KB = 1024, small 4KB = 4096
this.maxBufferSize = maxBufferSize;
this.medBufferSize = maxBufferSize / 4;
this.smallBufferSize = maxBufferSize / 16;
long eachPoolSize = maxMemoryToPool / 3;
long numLargeBuffers = eachPoolSize / maxBufferSize;
long numMedBuffers = eachPoolSize / medBufferSize;
long numSmallBuffers = eachPoolSize / smallBufferSize;
long numBuffers = numLargeBuffers + numMedBuffers + numSmallBuffers;
this.buffersList = new byte[numBuffers][];
this.handles = new GCHandle[numBuffers];
this.freeSmallBuffers = new ConcurrentStack<byte[]>();
this.freeMedianBuffers = new ConcurrentStack<byte[]>();
this.freeLargeBuffers = new ConcurrentStack<byte[]>();
int lastLarge = 0;
for (int i = 0; i < numLargeBuffers; i++, lastLarge++)
{
buffersList[i] = new byte[maxBufferSize];
handles[i] = GCHandle.Alloc(buffersList[i], GCHandleType.Pinned);
this.freeLargeBuffers.Push(buffersList[i]);
}
int lastMed = lastLarge;
for (int i = lastLarge; i < numMedBuffers + lastLarge; i++, lastMed++)
{
buffersList[i] = new byte[this.medBufferSize];
handles[i] = GCHandle.Alloc(buffersList[i], GCHandleType.Pinned);
this.freeMedianBuffers.Push(buffersList[i]);
}
for (int i = lastMed; i < numSmallBuffers + lastMed; i++)
{
buffersList[i] = new byte[this.smallBufferSize];
handles[i] = GCHandle.Alloc(buffersList[i], GCHandleType.Pinned);
this.freeSmallBuffers.Push(buffersList[i]);
}
}
public override byte[] TakeBuffer(int bufferSize)
{
if (bufferSize > this.maxBufferSize)
{
return null;
}
byte[] returnedBuffer = null;
if (bufferSize <= this.smallBufferSize)
{
this.freeSmallBuffers.TryPop(out returnedBuffer);
return returnedBuffer;
}
if (bufferSize <= this.medBufferSize)
{
this.freeMedianBuffers.TryPop(out returnedBuffer);
return returnedBuffer;
}
this.freeLargeBuffers.TryPop(out returnedBuffer);
return returnedBuffer;
}
/// <summary>
/// Returned buffer must have been acquired via a call to TakeBuffer
/// </summary>
/// <param name="buffer"></param>
public override void ReturnBuffer(byte[] buffer)
{
if (buffer.Length <= this.smallBufferSize)
{
this.freeSmallBuffers.Push(buffer);
}
else if (buffer.Length <= this.medBufferSize)
{
this.freeMedianBuffers.Push(buffer);
}
else
{
this.freeLargeBuffers.Push(buffer);
}
}
public override void Clear()
{
for (int i = 0; i < this.buffersList.Length; i++)
{
this.handles[i].Free();
this.buffersList[i] = null;
}
this.buffersList = null;
this.freeSmallBuffers.Clear();
this.freeMedianBuffers.Clear();
this.freeLargeBuffers.Clear();
}
}
private class PooledBufferManager : InternalBufferManager
{
private const int minBufferSize = 128;
private const int maxMissesBeforeTuning = 8;
private const int initialBufferCount = 1;
private readonly object tuningLock;
private int[] bufferSizes;
private BufferPool[] bufferPools;
private long remainingMemory;
private bool areQuotasBeingTuned;
private int totalMisses;
public PooledBufferManager(long maxMemoryToPool, int maxBufferSize)
{
this.tuningLock = new object();
this.remainingMemory = maxMemoryToPool;
var bufferPoolList = new List<BufferPool>();
for (int bufferSize = minBufferSize; ;)
{
long bufferCountLong = this.remainingMemory / bufferSize;
int bufferCount = bufferCountLong > int.MaxValue ? int.MaxValue : (int)bufferCountLong;
if (bufferCount > initialBufferCount)
{
bufferCount = initialBufferCount;
}
bufferPoolList.Add(BufferPool.CreatePool(bufferSize, bufferCount));
this.remainingMemory -= (long)bufferCount * bufferSize;
if (bufferSize >= maxBufferSize)
{
break;
}
long newBufferSizeLong = (long)bufferSize * 2;
if (newBufferSizeLong > (long)maxBufferSize)
{
bufferSize = maxBufferSize;
}
else
{
bufferSize = (int)newBufferSizeLong;
}
}
this.bufferPools = bufferPoolList.ToArray();
this.bufferSizes = new int[bufferPools.Length];
for (int i = 0; i < bufferPools.Length; i++)
{
this.bufferSizes[i] = bufferPools[i].BufferSize;
}
}
public override void Clear()
{
for (int i = 0; i < this.bufferPools.Length; i++)
{
BufferPool bufferPool = this.bufferPools[i];
bufferPool.Clear();
}
}
private void ChangeQuota(ref BufferPool bufferPool, int delta)
{
BufferPool oldBufferPool = bufferPool;
int newLimit = oldBufferPool.Limit + delta;
var newBufferPool = BufferPool.CreatePool(oldBufferPool.BufferSize, newLimit);
for (int i = 0; i < newLimit; i++)
{
byte[] buffer = oldBufferPool.Take();
if (buffer == null)
{
break;
}
newBufferPool.Return(buffer);
newBufferPool.IncrementCount();
}
this.remainingMemory -= oldBufferPool.BufferSize * delta;
bufferPool = newBufferPool;
}
private void DecreaseQuota(ref BufferPool bufferPool)
{
ChangeQuota(ref bufferPool, -1);
}
private int FindMostExcessivePool()
{
long maxBytesInExcess = 0;
int index = -1;
for (int i = 0; i < this.bufferPools.Length; i++)
{
BufferPool bufferPool = this.bufferPools[i];
if (bufferPool.Peak < bufferPool.Limit)
{
long bytesInExcess = (bufferPool.Limit - bufferPool.Peak) * (long)bufferPool.BufferSize;
if (bytesInExcess > maxBytesInExcess)
{
index = i;
maxBytesInExcess = bytesInExcess;
}
}
}
return index;
}
private int FindMostStarvedPool()
{
long maxBytesMissed = 0;
int index = -1;
for (int i = 0; i < this.bufferPools.Length; i++)
{
BufferPool bufferPool = this.bufferPools[i];
if (bufferPool.Peak == bufferPool.Limit)
{
long bytesMissed = bufferPool.Misses * (long)bufferPool.BufferSize;
if (bytesMissed > maxBytesMissed)
{
index = i;
maxBytesMissed = bytesMissed;
}
}
}
return index;
}
private BufferPool FindPool(int desiredBufferSize)
{
for (int i = 0; i < this.bufferSizes.Length; i++)
{
if (desiredBufferSize <= this.bufferSizes[i])
{
return this.bufferPools[i];
}
}
return null;
}
private void IncreaseQuota(ref BufferPool bufferPool)
{
ChangeQuota(ref bufferPool, 1);
}
public override void ReturnBuffer(byte[] buffer)
{
Fx.Assert(buffer != null, "caller must verify");
BufferPool bufferPool = FindPool(buffer.Length);
if (bufferPool != null)
{
if (buffer.Length != bufferPool.BufferSize)
{
throw Fx.Exception.Argument(nameof(buffer), CommonResources.BufferIsNotRightSizeForBufferManager);
}
if (bufferPool.Return(buffer))
{
bufferPool.IncrementCount();
}
}
}
public override byte[] TakeBuffer(int bufferSize)
{
Fx.Assert(bufferSize >= 0, "caller must ensure a non-negative argument");
BufferPool bufferPool = FindPool(bufferSize);
if (bufferPool != null)
{
byte[] buffer = bufferPool.Take();
if (buffer != null)
{
bufferPool.DecrementCount();
return buffer;
}
if (bufferPool.Peak == bufferPool.Limit)
{
bufferPool.Misses++;
if (++totalMisses >= maxMissesBeforeTuning)
{
TuneQuotas();
}
}
return InternalBufferManager.AllocateByteArray(bufferPool.BufferSize);
}
else
{
return InternalBufferManager.AllocateByteArray(bufferSize);
}
}
private void TuneQuotas()
{
if (this.areQuotasBeingTuned)
{
return;
}
bool lockHeld = false;
try
{
Monitor.TryEnter(this.tuningLock, ref lockHeld);
// Don't bother if another thread already has the lock
if (!lockHeld || this.areQuotasBeingTuned)
{
return;
}
this.areQuotasBeingTuned = true;
}
finally
{
if (lockHeld)
{
Monitor.Exit(this.tuningLock);
}
}
// find the "poorest" pool
int starvedIndex = FindMostStarvedPool();
if (starvedIndex >= 0)
{
BufferPool starvedBufferPool = this.bufferPools[starvedIndex];
if (this.remainingMemory < starvedBufferPool.BufferSize)
{
// find the "richest" pool
int excessiveIndex = FindMostExcessivePool();
if (excessiveIndex >= 0)
{
// steal from the richest
DecreaseQuota(ref this.bufferPools[excessiveIndex]);
}
}
if (this.remainingMemory >= starvedBufferPool.BufferSize)
{
// give to the poorest
IncreaseQuota(ref this.bufferPools[starvedIndex]);
}
}
// reset statistics
for (int i = 0; i < this.bufferPools.Length; i++)
{
BufferPool bufferPool = this.bufferPools[i];
bufferPool.Misses = 0;
}
this.totalMisses = 0;
this.areQuotasBeingTuned = false;
}
private abstract class BufferPool
{
private int bufferSize;
private int count;
private int limit;
private int misses;
private int peak;
public BufferPool(int bufferSize, int limit)
{
this.bufferSize = bufferSize;
this.limit = limit;
}
public int BufferSize
{
get { return this.bufferSize; }
}
public int Limit
{
get { return this.limit; }
}
public int Misses
{
get { return this.misses; }
set { this.misses = value; }
}
public int Peak
{
get { return this.peak; }
}
public void Clear()
{
this.OnClear();
this.count = 0;
}
public void DecrementCount()
{
int newValue = this.count - 1;
if (newValue >= 0)
{
this.count = newValue;
}
}
public void IncrementCount()
{
int newValue = this.count + 1;
if (newValue <= this.limit)
{
this.count = newValue;
if (newValue > this.peak)
{
this.peak = newValue;
}
}
}
internal abstract byte[] Take();
internal abstract bool Return(byte[] buffer);
internal abstract void OnClear();
internal static BufferPool CreatePool(int bufferSize, int limit)
{
// To avoid many buffer drops during training of large objects which
// get allocated on the LOH, we use the LargeBufferPool and for
// bufferSize < 85000, the SynchronizedPool. There is a 12 or 24(x64)
// byte overhead for an array so we use 85000-24=84976 as the limit
if (bufferSize < 84976)
{
#if !NET451
// Consider fallback to LargeBufferPool
throw new NotImplementedException();
#else
return new SynchronizedBufferPool(bufferSize, limit);
#endif
}
else
{
return new LargeBufferPool(bufferSize, limit);
}
}
#if NET451
class SynchronizedBufferPool : BufferPool
{
SynchronizedPool<byte[]> innerPool;
internal SynchronizedBufferPool(int bufferSize, int limit)
: base(bufferSize, limit)
{
this.innerPool = new SynchronizedPool<byte[]>(limit);
}
internal override void OnClear()
{
this.innerPool.Clear();
}
internal override byte[] Take()
{
return this.innerPool.Take();
}
internal override bool Return(byte[] buffer)
{
return this.innerPool.Return(buffer);
}
}
#endif
private class LargeBufferPool : BufferPool
{
private Stack<byte[]> items;
internal LargeBufferPool(int bufferSize, int limit)
: base(bufferSize, limit)
{
this.items = new Stack<byte[]>(limit);
}
private object ThisLock
{
get
{
return this.items;
}
}
internal override void OnClear()
{
lock (ThisLock)
{
this.items.Clear();
}
}
internal override byte[] Take()
{
lock (ThisLock)
{
if (this.items.Count > 0)
{
return this.items.Pop();
}
}
return null;
}
internal override bool Return(byte[] buffer)
{
lock (ThisLock)
{
if (this.items.Count < this.Limit)
{
this.items.Push(buffer);
return true;
}
}
return false;
}
}
}
}
private class GCBufferManager : InternalBufferManager
{
private static GCBufferManager value = new GCBufferManager();
private GCBufferManager()
{
}
public static GCBufferManager Value
{
get { return value; }
}
public override void Clear()
{
}
public override byte[] TakeBuffer(int bufferSize)
{
return InternalBufferManager.AllocateByteArray(bufferSize);
}
public override void ReturnBuffer(byte[] buffer)
{
// do nothing, GC will reclaim this buffer
}
}
}
}