AdlsDotNetSDK/FileTransfer/RandomDataStream.cs (148 lines of code) (raw):

using System; using System.Threading; using System.IO; using System.Security.Cryptography; namespace Microsoft.Azure.DataLake.Store.FileTransfer { internal class RandomDataStream : Stream { private static int size = 1024 * 1024 * 16; private static byte[] internalBuffer = new byte[size * 2]; long cursor = 0; long streamLength = -1; static RandomDataStream() { RandomNumberGenerator prng = RandomNumberGenerator.Create(); byte[] random = new byte[size]; prng.GetBytes(random); Buffer.BlockCopy(random, 0, internalBuffer, 0, size); Buffer.BlockCopy(random, 0, internalBuffer, size, size); } internal RandomDataStream() : this(-1) { } internal RandomDataStream(long length) { streamLength = length; } public byte[] InternalBuffer { get { return internalBuffer; } } public int InternalBufferSize { get { return size; } } public override bool CanRead { get { return true; } } public override bool CanSeek { get { return true; } } public override bool CanWrite { get { return false; } } public override void Flush() { } public override long Length { get { if (streamLength != -1) { return streamLength; } else { throw new NotImplementedException(); } } } public override long Position { get { return Interlocked.Read(ref cursor); } set { Seek(Position, SeekOrigin.Begin); } } public Stream GetSubstream(long offset, int length) { if (length > size) throw new ArgumentOutOfRangeException(nameof(length)); if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset)); int bufferOffset = (int)(offset % size); // this will fit in an int since size is int return new MemoryStream(internalBuffer, bufferOffset, size); } public Stream GetSubstream(int count) { if (count > size) throw new ArgumentOutOfRangeException(nameof(count)); long tCursor; int tCount; do { tCursor = Interlocked.Read(ref cursor); tCount = count; if (tCursor + tCount > streamLength) tCount = (int)(streamLength - tCursor); if (tCount < 1) return null; } while (Interlocked.CompareExchange(ref cursor, tCursor + tCount, tCursor) != tCursor); int bufferOffset = (int)(tCursor % size); // this will fit in an int since size is int return new MemoryStream(internalBuffer, bufferOffset, tCount); } public override int Read(byte[] buffer, int offset, int count) { if (count > size) count = size; long tCursor; int tCount; do { tCursor = Interlocked.Read(ref cursor); tCount = count; if (tCursor + tCount > streamLength) tCount = (int)(streamLength - tCursor); if (tCount < 1) return 0; } while (Interlocked.CompareExchange(ref cursor, tCursor + tCount, tCursor) != tCursor); Buffer.BlockCopy(internalBuffer, (int)(tCursor % size), buffer, offset, tCount); return tCount; } public override long Seek(long offset, SeekOrigin origin) { long newCursor = cursor; if (origin == SeekOrigin.Begin) { newCursor = offset % size; if (newCursor > streamLength - 1) newCursor = streamLength - 1; Interlocked.Exchange(ref cursor, newCursor); } if (origin == SeekOrigin.Current) { //Do this: cursor = cursor+offset; long tCursor; do { tCursor = Interlocked.Read(ref cursor); long tOffset = offset; if (tCursor + tOffset >= streamLength) tOffset = (streamLength - 1) - tCursor; newCursor = tCursor + tOffset; } while (Interlocked.CompareExchange(ref cursor, newCursor, tCursor) != tCursor); } if (origin == SeekOrigin.End) { if (offset <= 0 && streamLength != -1 && (-offset < streamLength)) { newCursor = streamLength + offset; Interlocked.Exchange(ref cursor, newCursor); } } return newCursor; } public override void SetLength(long value) { if (value > 0) streamLength = value; if (cursor > streamLength) cursor = streamLength; // this *should* be ok without interlocked - need to think more } public override void Write(byte[] buffer, int offset, int count) { throw new NotImplementedException(); } } }