AdlsDotNetSDKUnitTest/WriteWithArrayPoolUnitTest.cs (171 lines of code) (raw):

using Microsoft.VisualStudio.TestTools.UnitTesting; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Microsoft.Azure.DataLake.Store.UnitTest { /// <summary> /// Class which creates an array of arraypools of type T /// </summary> /// <typeparam name="T">the type parameter for the class</typeparam> public class FixedStandardArrayPool<T> : AdlsArrayPool<T> { private int _numberOfRentCalled = 0; internal int NumberOfRentCalled { get { return _numberOfRentCalled; } } private int _numberOfReturnCalled = 0; internal int NumberOfReturnCalled { get { return _numberOfReturnCalled; } } /// <summary> /// Size of the each buffer array /// </summary> private readonly int bufferSize; /// <summary> /// Pool of buffer arrays that may be reused /// </summary> private readonly BlockingCollection<T[]> pool; /// <summary> /// Total number of objects created /// </summary> private int totalObjectsCreated; /// <summary> /// Initializes a new instance of the <see cref="FixedStandardArrayPool{T}"/> class. /// </summary> /// <param name="bufferSize">Size of the buffers that can be rented</param> /// <param name="concurrency">Number of concurrent rents allowed</param> public FixedStandardArrayPool(int bufferSize, int concurrency) { this.pool = new BlockingCollection<T[]>(concurrency); this.totalObjectsCreated = 0; this.bufferSize = bufferSize; } /// <summary> /// Rent a buffer from the pool. /// /// The array returned by Rent is owned by the caller of rent, but should be returned to /// the pool via a call to Return when the renter no longer needs a copy. /// </summary> /// <param name="minimumLength">ignored value</param> /// <returns>Array of type <see cref="T"/> with length <see cref="bufferSize"/></returns> public override T[] Rent(int minimumLength) { T[] buffer; Interlocked.Increment(ref _numberOfRentCalled); if (!this.pool.TryTake(out buffer)) { // Create a new one if the pool is not full lock (this) { if (totalObjectsCreated < this.pool.BoundedCapacity) { buffer = new T[this.bufferSize]; totalObjectsCreated++; } } // No new T was created, so wait for a release to happen. if (buffer == null) { buffer = this.pool.Take(); } } return buffer; } public override Task<T[]> RentAsync(int minimumLength) { return Task.FromResult(Rent(minimumLength)); } /// <summary> /// Return a buffer to the pool /// </summary> /// <param name="buffer">Buffer for return</param> /// <param name="clearArray">Should array be cleared before returning to the pool</param> public override void Return(T[] buffer, bool clearArray = false) { if (clearArray) { Array.Clear(buffer, 0, buffer.Length); } Interlocked.Increment(ref _numberOfReturnCalled); lock (this) { // If we have over allocated or returned an array which is not matching in length, then we will not store this buffer. if (this.pool.Count >= this.pool.BoundedCapacity || buffer.Length != this.bufferSize) { return; } // Otherwise Add it to Queue this.pool.Add(buffer); } } public override Task ReturnAsync(T[] array, bool clearArray = false) { Return(array, clearArray); return Task.FromResult(default(T)); } internal void Reset() { _numberOfRentCalled = 0; _numberOfReturnCalled = 0; } } [TestClass] public class WriteWithArrayPoolUnitTest { private static FixedStandardArrayPool<byte> _arrayPool; /// <summary> /// Adls Client /// </summary> private static AdlsClient _adlsClient; private static string BasePath; private static string RemotePath; [ClassInitialize] public static void SetupTest(TestContext context) { BasePath = context.Properties["BasePath"].ToString(); RemotePath = "/" + BasePath + "/ReadWriteWithArrayPoolUnitTest" + SdkUnitTest.TestId; _arrayPool = new FixedStandardArrayPool<byte>(4 * 1024 * 1024, 2); _adlsClient = SdkUnitTest.SetupSuperClient(); } [TestMethod] public void SerialCreateAndAppend() { int count = 3; string path = RemotePath + "/SerialCreateAndAppend"; int totLength = 8 * 1024 * 1024; string text1 = SdkUnitTest.RandomString(totLength); byte[] textByte1 = Encoding.UTF8.GetBytes(text1); for (int index = 0; index < count; index++) { using (var stream = _adlsClient.CreateFile(path + index, IfExists.Overwrite, _arrayPool, 4 * 1024 * 1024)) { // Flush in empty buffer, no return called stream.Flush(); // Rent called stream.Write(textByte1, 0, textByte1.Length); //Return called stream.Flush(); stream.Flush(); // Rent called stream.Write(textByte1, 0, textByte1.Length); // Rent should not be called stream.Write(textByte1, 0, textByte1.Length); } }; Assert.IsTrue(_arrayPool.NumberOfRentCalled == count * 2); Assert.IsTrue(_arrayPool.NumberOfReturnCalled == count * 2); for (int index = 0; index < count; index++) { string output = ""; using (var istream = _adlsClient.GetReadStream(path + index)) { int noOfBytes; byte[] buffer = new byte[2 * 1024 * 1024]; do { noOfBytes = istream.Read(buffer, 0, buffer.Length); output += Encoding.UTF8.GetString(buffer, 0, noOfBytes); } while (noOfBytes > 0); } Assert.IsTrue(output.Equals(text1 + text1 + text1)); }; _arrayPool.Reset(); } [TestMethod] public void ParallelCreateAndAppend() { int count = 3; string path = RemotePath + "/ParallelCreateAndAppend"; int totLength = 2 * 1024 * 1024; string text1 = SdkUnitTest.RandomString(totLength); byte[] textByte1 = Encoding.UTF8.GetBytes(text1); Parallel.For(0, count, index=> { using (var stream = _adlsClient.CreateFile(path + index, IfExists.Overwrite, _arrayPool, 4*1024*1024)) { // Flush in empty buffer, no return called stream.Flush(); // Rent called stream.Write(textByte1, 0, textByte1.Length); //Return called stream.Flush(); stream.Flush(); // Rent called stream.Write(textByte1, 0, textByte1.Length); // Rent should not be called stream.Write(textByte1, 0, textByte1.Length); } }); Assert.IsTrue(_arrayPool.NumberOfRentCalled == count * 2); Assert.IsTrue(_arrayPool.NumberOfReturnCalled == count * 2); for (int index = 0; index < count; index++) { string output = ""; using (var istream = _adlsClient.GetReadStream(path+index)) { int noOfBytes; byte[] buffer = new byte[2*1024 * 1024]; do { noOfBytes = istream.Read(buffer, 0, buffer.Length); output += Encoding.UTF8.GetString(buffer, 0, noOfBytes); } while (noOfBytes > 0); } Assert.IsTrue(output.Equals(text1 + text1 + text1)); }; _arrayPool.Reset(); } } }