AdlsDotNetSDK/ContentProcessor.cs (124 lines of code) (raw):

using Microsoft.Azure.DataLake.Store.QueueTools; using System; using System.Linq; using System.Net; using System.Threading; namespace Microsoft.Azure.DataLake.Store { /// <summary> /// Computes the content summary of the directory /// </summary> internal class ContentProcessor { /// <summary> /// Mutex Lock object for doing synchronized setting and getting the clientexception /// </summary> private readonly Object _thisLock = new Object(); /// <summary> /// Total number of thread workers /// </summary> internal int NumThreads; /// <summary> /// Array of thread workers /// </summary> private readonly Thread[] _threadWorker; /// <summary> /// Queue for containing the directory entries picked by the thread worker /// </summary> private readonly QueueWrapper<DirectoryEntry> _queue; /// <summary> /// Client exception if any raised by any thread /// </summary> private AdlsException _clientException; /// <summary> /// Cancellation Token /// </summary> private CancellationToken CancelToken { get; } /// <summary> /// ADLS client /// </summary> private AdlsClient Client { get; } /// <summary> /// Root path whose content summary we need /// </summary> private string RootPath { get; } //Tracks total directory count private long _directoryCount; /// <summary> /// Tracks total file count /// </summary> private long _fileCount; /// <summary> /// Tracks total size /// </summary> private long _totalBytes; /// <summary> /// internal API that gets the content summary for a path /// </summary> /// <param name="client">ADLS Client</param> /// <param name="path">Path of the directory or file</param> /// <param name="numThreads"> Number of threads</param> /// <param name="cancelToken">Cacellation Token</param> /// <returns>Content summary</returns> internal static ContentSummary GetContentSummary(AdlsClient client, string path, int numThreads=-1, CancellationToken cancelToken = default(CancellationToken)) { return new ContentProcessor(client, path,numThreads, cancelToken).GetContentSummary(); } private ContentProcessor(AdlsClient client, string path,int numThreads, CancellationToken cancelToken = default(CancellationToken)) { Client = client; CancelToken = cancelToken; NumThreads = numThreads <= 0 ? AdlsClient.DefaultNumThreads : numThreads; _threadWorker = new Thread[NumThreads]; for (int i = 0; i < NumThreads; i++) { _threadWorker[i] = new Thread(Run) { Name = "Thread-" + i }; } _queue = new QueueWrapper<DirectoryEntry>(NumThreads); RootPath = path; } /// <summary> /// Starts each thread worker. Waits for each thread worker to finish. If there was an exception throws it. /// Else returns a contentsummary /// </summary> /// <returns>Content summary-Total file count, directory count, total size</returns> private ContentSummary GetContentSummary() { _queue.Add(new DirectoryEntry(RootPath)); for (int i = 0; i < NumThreads; i++) { _threadWorker[i].Start(); } for (int i = 0; i < NumThreads; i++) { _threadWorker[i].Join(); } if (_clientException != null)//No need to lock here { throw _clientException; } return new ContentSummary(_directoryCount, _fileCount, _totalBytes, _totalBytes); } /// <summary> /// Atomically sets the client exception /// </summary> /// <param name="ex"></param> private void SetException(AdlsException ex) { lock (_thisLock) { _clientException = ex; } } /// <summary> /// Atomically gets the client exception /// </summary> /// <returns></returns> private AdlsException GetException() { lock (_thisLock) { return _clientException; } } /// <summary> /// The run method of each thread worker. It polls for a directory from the queue. Then calls listStatus for that directory. /// If it gets any sub-directory it adds it to the queue so that it can be processed again later.For each file/sub-directory it updates the file/directory/size variables /// </summary> private void Run() { while (true) { DirectoryEntry der = _queue.Poll(); //GetException should be put here because some threads might be in waiting state and come back and see exception if (GetException() != null || der == null)//der==null: Time to finish as all other threads have no entries { _queue.Add(null);//Poison block to notify other threads to close return; } if (CancelToken.IsCancellationRequested)//Check if operation is cancelled { AdlsException excep = new AdlsException("Content summary processing cancelled") { Ex = new OperationCanceledException() }; SetException(excep); _queue.Add(null); return; } try { foreach (var dir in Client.EnumerateDirectory(der.FullName)) { if (dir.Type == DirectoryEntryType.DIRECTORY) { Interlocked.Increment(ref _directoryCount); if (!(dir.Attribute != null && dir.Attribute.Any(attr => attr == DirectoryEntryAttributeType.Link))) { _queue.Add(dir); } } else { Interlocked.Increment(ref _fileCount); Interlocked.Add(ref _totalBytes, dir.Length); } } } catch (AdlsException ex) { if (ex.HttpStatus != HttpStatusCode.NotFound)//Do not stop summary if the file is deleted { SetException(ex);//Sets the global exception to signal other threads to close _queue.Add(null);//Handle corner cases like when exception is raised other threads can be in wait state return; } } } } } }