AdlsDotNetSDK/AdlsOutputStream.cs (338 lines of code) (raw):

using NLog; using System; using System.IO; using System.Net; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.DataLake.Store.RetryPolicies; namespace Microsoft.Azure.DataLake.Store { /// <summary> /// ADLS Output stream that writes data to a file on Data lake. It writes data to a buffer and when the buffer gets filled, writes data in bulk to server /// Data can be written asynchronously/synchronously. Write is fully synchronous till the transport layer. WriteAsync is fully asynchronous till the transport layer. /// AdlsOutputStream is not threadsafe since it uses buffer (maintains state so not stateless). /// </summary> public class AdlsOutputStream : Stream { /// <summary> /// Logger to log messages related to output stream /// </summary> private static readonly Logger OutStreamLog = LogManager.GetLogger("adls.dotnet.OutputStream"); /// <summary> /// Full path of the file /// </summary> private string Filename { get; } /// <summary> /// ADLS client /// </summary> private AdlsClient Client { get; } /// <summary> /// String containing the lease ID, when a client obtains a lease on a file no other client can make edits to the file /// </summary> private string LeaseId { get; } /// <summary> /// Internal buffer where client writes, when it gets filled up or we do flush/dispose then only we write buffer to server /// </summary> private byte[] Buffer { get; set; } /// <summary> /// Internal buffer pool if passed. We will take a byte array of 4 mb from there and then return it to them after close /// </summary> private AdlsArrayPool<byte> _bufferPool; /// <summary> /// Capacity of the internal buffer. Check CopyFileJob.cs before changing this threshold. /// </summary> internal static int BufferMaxCapacity = 4 * 1024 * 1024; internal static int BufferMinCapacity = 1 * 1024 * 1024; /// <summary> /// Number of bytes written to the server /// </summary> private int _bufferCapacity; /// <summary> /// Pointer in the buffer till which data is written /// </summary> private int BufferSize { get; set; } /// <summary> /// Pointer in file till which data is written, i.e. length of file /// </summary> private long FilePointer { get; set; } /// <summary> /// Whether metadata is synced (data is flushed). Syncing metadata is expensive so we do not want to do it unless /// there has been appends with SYNFLAG.DATA since last append with SYNCFLAG.METADA. /// </summary> private bool MetaDataSynced { get; set; } /// <summary> /// Whether the stream is disposed /// </summary> private bool _isDisposed; /// <summary> /// Stream cannot read data /// </summary> public override bool CanRead => false; /// <summary> /// Stream cannot seek data /// </summary> public override bool CanSeek => false; /// <summary> /// Stream can write data /// </summary> public override bool CanWrite => true; /// <summary> /// Not supported /// </summary> public override long Length => throw new NotSupportedException(); /// <summary> /// Set is not supported. Gets the position where next data will be written /// </summary> public override long Position { get => FilePointer + BufferSize; set => throw new NotSupportedException(); } private AdlsOutputStream(string filename, AdlsClient client, string leaseId, AdlsArrayPool<byte> bufferPool, int bufferCapacity) { Filename = filename; Client = client; LeaseId = string.IsNullOrEmpty(leaseId) ? Guid.NewGuid().ToString() : leaseId; BufferSize = 0; if(bufferCapacity > BufferMaxCapacity) { throw new Exception($"BufferCApacity is too big. Maximum Capacity: {BufferMaxCapacity}"); } else if(bufferCapacity < BufferMinCapacity) { throw new Exception($"BufferCApacity is too small. Minimum Capacity: {BufferMinCapacity}"); } _bufferCapacity = bufferCapacity; if (bufferPool != null) { _bufferPool = bufferPool; } else { Buffer = new byte[_bufferCapacity]; } } protected AdlsOutputStream() { } internal static async Task<AdlsOutputStream> GetAdlsOutputStreamAsync(string filename, AdlsClient client, bool isNew, string leaseId, AdlsArrayPool<byte> bufferPool, int bufferCapacity) { var adlsOpStream = new AdlsOutputStream(filename, client, leaseId, bufferPool, bufferCapacity); await adlsOpStream.InitializeFileSizeAsync(isNew).ConfigureAwait(false); if (OutStreamLog.IsTraceEnabled) { OutStreamLog.Trace($"ADLFileOutputStream, Created for client {client.ClientId} for file {filename}, create={isNew}"); } return adlsOpStream; } /// <summary> /// If buffer pool is apssed then rent from the pool if buffer is released from the alst flush /// </summary> private async Task CreateBufferIfNotInitializedAsync() { if(_bufferPool != null && Buffer == null) { Buffer = await _bufferPool.RentAsync(_bufferCapacity).ConfigureAwait(false); if (Buffer == null || Buffer.Length < _bufferCapacity) { throw new OutOfMemoryException($"Could not rent a buffer of size {_bufferCapacity}"); } } } /// <summary> /// If buffer is rented from bufferpool, then release the buffer to buffer pool while flush is called /// Else buffer is released only when disposing is true /// </summary> /// <param name="disposing"></param> private async Task ReleaseBufferIfInitializedAsync(bool disposing = false) { if (_bufferPool != null) { if (Buffer != null) { await _bufferPool.ReturnAsync(Buffer, true).ConfigureAwait(false); Buffer = null; } } else if (disposing) { Buffer = null; } } /// <summary> /// Initialize the file size by doing a getfilestatus if we are creating in append mode /// </summary> /// <param name="isNew">True if we are creating the file else false</param> /// <returns></returns> private async Task InitializeFileSizeAsync(bool isNew) { if (isNew) { FilePointer = 0; } else { OperationResponse resp = new OperationResponse(); //Initialize the filepointer to the current length of file // Pass getConsistentlength so that we get the updated length of stream DirectoryEntry diren = await Core.GetFileStatusAsync(Filename, UserGroupRepresentation.ObjectID, Client, new RequestOptions(Client.GetPerRequestTimeout(), new ExponentialRetryPolicy()), resp, default(CancellationToken), true).ConfigureAwait(false); if (diren == null) { throw Client.GetExceptionFromResponse(resp, "Error in getting metadata while creating InputStream for file " + Filename + "."); } FilePointer = diren.Length; } } /// <summary> /// Asynchronously flushes data from buffer to server and updates the metadata /// </summary> /// <param name="cancelToken">Cancellation token</param> public override async Task FlushAsync(CancellationToken cancelToken) { if (_isDisposed) { throw new ObjectDisposedException("Stream is closed"); } // TODO test double flush await WriteServiceAsync(SyncFlag.METADATA, cancelToken).ConfigureAwait(false); await ReleaseBufferIfInitializedAsync().ConfigureAwait(false); } /// <summary> /// Synchronously flushes data from buffer to server and updates the metadata /// </summary> public override void Flush() { if (_isDisposed) { throw new ObjectDisposedException("Stream is closed"); } // TODO test double flush WriteService(SyncFlag.METADATA); ReleaseBufferIfInitializedAsync().GetAwaiter().GetResult(); } /// <summary> /// Not supported /// </summary> /// <param name="buffer"></param> /// <param name="offset"></param> /// <param name="count"></param> /// <returns></returns> public override int Read(byte[] buffer, int offset, int count) { throw new NotSupportedException(); } /// <summary> /// Not supported /// </summary> /// <param name="offset"></param> /// <param name="origin"></param> /// <returns></returns> public override long Seek(long offset, SeekOrigin origin) { throw new NotSupportedException(); } /// <summary> /// Not supported /// </summary> /// <param name="value"></param> public override void SetLength(long value) { throw new NotSupportedException(); } /// <summary> /// Copies data to internal buffer, and updates the internal buffer pointer /// </summary> /// <param name="buffer">Input byte array</param> /// <param name="offset">Offset in the byte array</param> /// <param name="count">length to copy</param> private void AddDataToBuffer(byte[] buffer, int offset, int count) { System.Buffer.BlockCopy(buffer, offset, Buffer, BufferSize, count); BufferSize += count; } /// <summary> /// Verifies write arguments /// </summary> /// <param name="buffer">Byte buffer</param> /// <param name="offset">Offset</param> /// <param name="count">Count</param> private void WriteVerify(byte[] buffer, int offset, int count) { if (buffer == null) { throw new ArgumentNullException(nameof(buffer)); } if (buffer != null && (offset >= buffer.Length || (offset < 0) || (count + offset > buffer.Length))) { throw new ArgumentOutOfRangeException(nameof(offset)); } if (_isDisposed) { throw new ObjectDisposedException("Stream is disposed"); } if (OutStreamLog.IsTraceEnabled) { OutStreamLog.Trace($"ADLFileOutputStream, Stream write of size {count} for file {Filename} for client {Client.ClientId}"); } } /// <summary> /// Writes data to internal buffer. If the buffer fills up then writes to the file in server. /// Does it asynchronously /// </summary> /// <param name="buffer">Input byte array containing the Data to write</param> /// <param name="offset">Offset in buffer</param> /// <param name="count">Count of bytes to write</param> /// <param name="cancelToken">Cancellation token</param> public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancelToken) { WriteVerify(buffer, offset, count); await CreateBufferIfNotInitializedAsync().ConfigureAwait(false); //If putting data in buffer will overflow the buffer if (BufferSize + count > _bufferCapacity) { //If data size is less than 4MB then we should gurantee that the write to be atomic if (count <= _bufferCapacity) { await WriteServiceAsync(SyncFlag.DATA, cancelToken).ConfigureAwait(false); } else { //Else we just continue writing data to server until the left data is less than buffer size while (BufferSize + count > _bufferCapacity) { int toCopy = _bufferCapacity - BufferSize; AddDataToBuffer(buffer, offset, toCopy);//Adds data to buffer to write to server count -= toCopy; offset += toCopy; await WriteServiceAsync(SyncFlag.DATA, cancelToken).ConfigureAwait(false);//Writes the buffer to server } } } AddDataToBuffer(buffer, offset, count); } /// <summary> /// Writes data to internal buffer. If the buffer fills up then writes to the file in server. /// Does it synchronously /// </summary> /// <param name="buffer">Input byte array containing the Data to write</param> /// <param name="offset">Offset in buffer</param> /// <param name="count">Count of bytes to write</param> public override void Write(byte[] buffer, int offset, int count) { WriteVerify(buffer, offset, count); CreateBufferIfNotInitializedAsync().GetAwaiter().GetResult(); //If putting data in buffer will overflow the buffer if (BufferSize + count > _bufferCapacity) { //If data size is less than 4MB then we should gurantee that the write to be atomic if (count <= _bufferCapacity) { WriteService(SyncFlag.DATA); } else { //Else we just continue writing data to server until the left data is less than buffer size while (BufferSize + count > _bufferCapacity) { int toCopy = _bufferCapacity - BufferSize; AddDataToBuffer(buffer, offset, toCopy);//Adds data to buffer to write to server count -= toCopy; offset += toCopy; WriteService(SyncFlag.DATA);//Writes the buffer to server } } } AddDataToBuffer(buffer, offset, count); } /// <summary> /// Releases the unmanaged resources used by the Stream and optionally releases the managed resources /// </summary> /// <param name="disposing">true to release both managed and unmanaged resources; false to release only unmanaged resources</param> protected override void Dispose(bool disposing) { if (_isDisposed) { return; } try { if (disposing) { // TODO test for dispose after flush //Based on Dispose Pattern reference objects shouldnt be touched during finalizer WriteService(SyncFlag.CLOSE); } } finally { BufferSize = 0; FilePointer = 0; if (disposing) { ReleaseBufferIfInitializedAsync(true).GetAwaiter().GetResult(); _isDisposed = true; if (OutStreamLog.IsTraceEnabled) { OutStreamLog.Trace($"ADLFileOutputStream, Stream closed for file {Filename} for client {Client.ClientId}"); } } base.Dispose(disposing); } } /// <summary> /// Makes a Append call to server to write data in buffer. Resets the FilePointer and BufferSize. /// This is an asynchronous call. /// </summary> /// <param name="flag">Type of append- It is just data or to update metadata or close the lease </param> /// <param name="cancelToken">Cancellation token</param> /// <returns></returns> private async Task WriteServiceAsync(SyncFlag flag, CancellationToken cancelToken) { if (BufferSize == 0 && (flag == SyncFlag.DATA || (MetaDataSynced && flag == SyncFlag.METADATA))) { return; } if (OutStreamLog.IsTraceEnabled) { OutStreamLog.Trace($"ADLFileOutputStream, Stream flush of size {BufferSize} at offset {FilePointer} for file {Filename} for client {Client.ClientId}"); } OperationResponse resp = new OperationResponse(); await Core.AppendAsync(Filename, LeaseId, LeaseId, flag, FilePointer, Buffer, 0, BufferSize, Client, new RequestOptions(Client.GetPerRequestTimeout(), new ExponentialRetryPolicy()), resp, cancelToken).ConfigureAwait(false); if (!resp.IsSuccessful) { // if this was a retry and we get bad offset, then this might be because we got a transient // failure on first try, but request succeeded on back-end. In that case, the retry would fail // with bad offset. To detect that, we check if there was a retry done, and if the current error we // have is bad offset. // If so, do a zero-length append at the current expected Offset, and if that succeeds, // then the file length must be good - swallow the error. If this append fails, then the last append // did not succeed and we have some other offset on server - bubble up the error. if (resp.Retries > 0 && resp.HttpStatus == HttpStatusCode.BadRequest && resp.RemoteExceptionName.Equals("BadOffsetException")) { bool zeroAppendIsSuccesful = await PerformZeroLengthAppendAsync(FilePointer + BufferSize, flag, cancelToken).ConfigureAwait(false); if (zeroAppendIsSuccesful) { if (OutStreamLog.IsDebugEnabled) { OutStreamLog.Debug($"ADLFileOutputStream, Zero size Append succeded and the expected FileSize is {FilePointer + BufferSize}, ignoring BadOffsetException for session {LeaseId} for file {Filename} for client {Client.ClientId}"); } FilePointer += BufferSize; BufferSize = 0; MetaDataSynced = flag == SyncFlag.METADATA; return; } if (OutStreamLog.IsDebugEnabled) { OutStreamLog.Debug($"ADLFileOutputStream, Append failed at offset {FilePointer} for session {LeaseId} for file {Filename} for client {Client.ClientId}"); } } throw Client.GetExceptionFromResponse(resp, $"Error in appending for file {Filename} at offset {FilePointer}."); } MetaDataSynced = flag == SyncFlag.METADATA;//Make sure if metadata is already updated, then do not try to update metada again unless data has been written FilePointer += BufferSize;//Update the filepointer as data is written to server BufferSize = 0;//Resets buffersize since all data in buffer is flushed to server } /// <summary> /// Makes a Append call to server to write data in buffer. Resets the FilePointer and BufferSize. /// This is a synchronous call. /// </summary> /// <param name="flag">Type of append- It is just data or to update metadata or close the lease </param> /// <returns></returns> private void WriteService(SyncFlag flag) { if (BufferSize == 0 && (flag == SyncFlag.DATA || MetaDataSynced && flag == SyncFlag.METADATA)) { return; } if (OutStreamLog.IsTraceEnabled) { OutStreamLog.Trace($"ADLFileOutputStream, Stream flush of size {BufferSize} at offset {FilePointer} for file {Filename} for client {Client.ClientId}"); } OperationResponse resp = new OperationResponse(); Core.Append(Filename, LeaseId, LeaseId, flag, FilePointer, Buffer, 0, BufferSize, Client, new RequestOptions(Client.GetPerRequestTimeout(), new ExponentialRetryPolicy()), resp); if (!resp.IsSuccessful) { // if this was a retry and we get bad offset, then this might be because we got a transient // failure on first try, but request succeeded on back-end. In that case, the retry would fail // with bad offset. To detect that, we check if there was a retry done, and if the current error we // have is bad offset. // If so, do a zero-length append at the current expected Offset, and if that succeeds, // then the file length must be good - swallow the error. If this append fails, then the last append // did not succeed and we have some other offset on server - bubble up the error. if (resp.Retries > 0 && resp.HttpStatus == HttpStatusCode.BadRequest && resp.RemoteExceptionName.Equals("BadOffsetException")) { bool zeroAppendIsSuccesful = PerformZeroLengthAppend(FilePointer + BufferSize, flag); if (zeroAppendIsSuccesful) { if (OutStreamLog.IsDebugEnabled) { OutStreamLog.Debug($"ADLFileOutputStream, Zero size Append succeded and the expected FileSize is {FilePointer + BufferSize}, ignoring BadOffsetException for session {LeaseId} for file {Filename} for client {Client.ClientId}"); } FilePointer += BufferSize; BufferSize = 0; MetaDataSynced = flag == SyncFlag.METADATA; return; } if (OutStreamLog.IsDebugEnabled) { OutStreamLog.Debug($"ADLFileOutputStream, Append failed at offset {FilePointer} for session {LeaseId} for file {Filename} for client {Client.ClientId}"); } } throw Client.GetExceptionFromResponse(resp, $"Error in appending for file {Filename} at offset {FilePointer}."); } MetaDataSynced = flag == SyncFlag.METADATA;//Make sure if metadata is already updated, then do not try to update metada again unless data has been written FilePointer += BufferSize;//Update the filepointer as data is written to server BufferSize = 0;//Resets buffersize since all data in buffer is flushed to server } /// <summary> /// Performs append of zero length to see whether the file is in consistent state with the client. /// This is an asynchronous operation. /// </summary> /// <param name="offsetFile">Offset in file at which the append will be made</param> /// <param name="syncFlag">The syncflag of the original append</param> /// <param name="cancelToken">Cancellation token</param> /// <returns></returns> private async Task<bool> PerformZeroLengthAppendAsync(long offsetFile, SyncFlag syncFlag, CancellationToken cancelToken) { OperationResponse resp = new OperationResponse(); await Core.AppendAsync(Filename, LeaseId, LeaseId, syncFlag, offsetFile, null, -1, 0, Client, new RequestOptions(Client.GetPerRequestTimeout(), new ExponentialRetryPolicy()), resp, cancelToken).ConfigureAwait(false); return resp.IsSuccessful; } /// <summary> /// Performs append of zero length to see whether the file is in consistent state with the client. /// This is a synchronous operation. /// </summary> /// <param name="offsetFile">Offset in file at which the append will be made</param> /// <param name="syncFlag">The syncflag of the original append</param> /// <returns></returns> private bool PerformZeroLengthAppend(long offsetFile, SyncFlag syncFlag) { OperationResponse resp = new OperationResponse(); Core.Append(Filename, LeaseId, LeaseId, syncFlag, offsetFile, null, -1, 0, Client, new RequestOptions(Client.GetPerRequestTimeout(), new ExponentialRetryPolicy()), resp); return resp.IsSuccessful; } } }