AdlsDotNetSDK/FileProperties/PropertyManager.cs (236 lines of code) (raw):

using System; using System.IO; using System.Threading; using Microsoft.Azure.DataLake.Store.FileProperties.Jobs; using Microsoft.Azure.DataLake.Store.QueueTools; using NLog; using System.Net; namespace Microsoft.Azure.DataLake.Store.FileProperties { internal class PropertyManager { private static readonly Logger PropertyLog = LogManager.GetLogger("adls.dotnet.PropertyManager"); internal static readonly Logger PropertyJobLog = LogManager.GetLogger("adls.dotnet.PropertyManager.Jobs"); internal AdlsClient Client; internal PriorityQueueWrapper<BaseJob> ConsumerQueue; /// <summary> /// Separate queue for writing property jobs. Only takes in DumpFilePropertyJob /// </summary> internal PriorityQueueWrapper<BaseJob> PropertyWriterQueue; private readonly int _numThreads; private Thread[] _threadConsumer; private readonly CancellationToken _cancelToken; /// <summary> /// Separate single thread to write the properties to file. You only need one thread /// because you are writing to one file on a disk. Remember there is no lock around the writer write. /// </summary> private Thread _threadWriter; private Exception _excep; internal PropertyTreeNode HeadNode; private readonly object _lock = new object(); internal long MaxDepth; internal bool GetSizeProperty; internal bool DisplayFiles; internal bool GetAclProperty; internal bool HideConsistentAclTree; internal bool SaveToLocal; internal string DumpFileName; internal StreamWriter PropertyDumpWriter; internal bool DontDeleteChildNodes; internal const char OuputLineSeparator = '\t'; private void SetException(Exception ex) { lock (_lock) { if (_excep == null) { _excep = ex; } } } private Exception GetException() { lock (_lock) { return _excep; } } private PropertyManager(AdlsClient client, bool getAclProperty, bool getDiskUsage, string saveFileName, bool saveToLocal, int numThreads, bool displayFiles, bool displayConsistentAcl, long maxDepth, CancellationToken cancelToken = default(CancellationToken)) { Client = client; SaveToLocal = saveToLocal; if (string.IsNullOrWhiteSpace(saveFileName)) { throw new ArgumentNullException(nameof(saveFileName)); } DumpFileName = saveFileName; GetSizeProperty = getDiskUsage; GetAclProperty = getAclProperty; _numThreads = numThreads <= 0 ? AdlsClient.DefaultNumThreads : numThreads; DisplayFiles = displayFiles; HideConsistentAclTree = GetAclProperty && displayConsistentAcl; MaxDepth = maxDepth; ConsumerQueue = new PriorityQueueWrapper<BaseJob>(_numThreads); PropertyWriterQueue = new PriorityQueueWrapper<BaseJob>(); _cancelToken = cancelToken; Stream underLyingStream; if (saveToLocal) { Utils.CreateParentDirectory(DumpFileName); underLyingStream = new FileStream(DumpFileName, FileMode.Create, FileAccess.ReadWrite); } else { underLyingStream = client.CreateFile(DumpFileName, IfExists.Overwrite); } PropertyDumpWriter = new StreamWriter(underLyingStream); WriteHeader(); } private void WriteHeader() { string output = ""; if (GetSizeProperty) { output = $"Total size of direct child files and directories{OuputLineSeparator}Total number of direct files{OuputLineSeparator}Total number of direct directories{OuputLineSeparator}Total size{OuputLineSeparator}Total number of files{OuputLineSeparator}Total number of directories"; } if (GetAclProperty) { // If DisplayFiles is false that means HideConsistentAcl has to be false (there is a check at entry point in client)/ // And if DisplayFiles is false consistentAcl information is not correct since acl of files is not known bool showAclConsistentColumn = DisplayFiles || HideConsistentAclTree; output += $"{(string.IsNullOrEmpty(output) ? "" : $"{OuputLineSeparator}")}Acl Entries{(showAclConsistentColumn ? OuputLineSeparator + "Whether Acl is same for all descendants" : "")}"; } lock (PropertyDumpWriter) { PropertyDumpWriter.WriteLine($"Entry name{OuputLineSeparator}Entry Type{OuputLineSeparator}{output}"); } } // Kept this in this class so that others can use it internal void WritePropertyTreeNodeToFile(PropertyTreeNode node) { string output = ""; if (GetSizeProperty) { output = $"{node.DirectChildSize}{OuputLineSeparator}{node.DirectChildFiles}{OuputLineSeparator}{node.DirectChildDirec}{OuputLineSeparator}{node.TotChildSize}{OuputLineSeparator}{node.TotChildFiles}{OuputLineSeparator}{node.TotChildDirec}"; } if (GetAclProperty) { if (!node.SkipAclOutput) { bool showAclConsistentColumn = DisplayFiles || HideConsistentAclTree; output += $"{(string.IsNullOrEmpty(output) ? "" : $"{OuputLineSeparator}")}{string.Join("|", node.Acls.Entries)}{(showAclConsistentColumn ? $"{OuputLineSeparator}{node.AllChildSameAcl}" : "")}"; } } // No need to put lock around this, since it is run by one thread PropertyDumpWriter.WriteLine($"{node.FullPath}{OuputLineSeparator}{node.Type}{OuputLineSeparator}{output}"); } private PropertyTreeNode RunGetProperty(string path) { try { if (PropertyLog.IsDebugEnabled) { PropertyLog.Debug($"FileProperty, SourcePath: {path}, GetDiskUsage: {GetSizeProperty}, GetAclProperty: {GetAclProperty}{(GetAclProperty ? $", AclConsistency: {HideConsistentAclTree}" : string.Empty)}, SaveToLocal: {SaveToLocal}"); } var dir = Client.GetDirectoryEntry(path); // If the path does not exist then it will throw an exception HeadNode = new PropertyTreeNode(dir.FullName, dir.Type, dir.Length, null, DisplayFiles || GetAclProperty); if (dir.Type == DirectoryEntryType.FILE && !DisplayFiles) { throw new ArgumentException("Input path is a file and DisplayFiles is false"); } if (_cancelToken.IsCancellationRequested) { return HeadNode; } ConsumerQueue.Add(new EnumerateAndGetPropertyJob(HeadNode, this)); //Threads responsible for enumerating and retrieving properties like size and acl _threadConsumer = new Thread[_numThreads]; for (int i = 0; i < _numThreads; i++) { _threadConsumer[i] = new Thread(ConsumerRun){Name = $"ConsumerThread-{i}"}; _threadConsumer[i].Start(); } //Thread responsible for writing to the file _threadWriter = new Thread(WriterThreadRun){Name = "WriterThread"}; _threadWriter.Start(); for (int i = 0; i < _numThreads; i++) { _threadConsumer[i].Join(); } //This will end the writer thread PropertyWriterQueue.Add(new PoisonJob()); _threadWriter.Join(); if (GetException() != null) { throw GetException(); } WritePropertyTreeNodeToFile(HeadNode); return HeadNode; } finally { PropertyDumpWriter.Dispose(); } } // For testing purposes, makes sure child nodes are not deleted after written to file private PropertyTreeNode RunTestGetProperty(string path) { DontDeleteChildNodes = true; return RunGetProperty(path); } // Unit test purpose internal static PropertyTreeNode TestGetProperty(string path, AdlsClient client, bool getDiskUsage, bool getAclProperty, string dumpFileName, bool saveToLocal, int numThreads = -1, bool displayFiles = false, bool hideConsistentAcl = false, long maxDepth = Int64.MaxValue) { return new PropertyManager(client, getAclProperty, getDiskUsage, dumpFileName, saveToLocal, numThreads, displayFiles, hideConsistentAcl, maxDepth).RunTestGetProperty(path); } /// <summary> /// Dumps file property to a local or adl file /// </summary> /// <param name="path">Path of the file or directory</param> /// <param name="client">Adls client</param> /// <param name="getAclProperty">True if we want Acl usage</param> /// <param name="getDiskUsage">True if we want disk usage</param> /// <param name="dumpFileName">Filename containing the dump</param> /// <param name="saveToLocal">True if we want to save to local file</param> /// <param name="numThreads">Number of threads</param> /// <param name="displayFiles">True if we want to display properties of files</param> /// <param name="hideConsistentAcl">True if we want to view consistent acl property only</param> /// <param name="maxDepth">Maximum depth till which we want to view the properties</param> internal static PropertyTreeNode GetFileProperty(string path, AdlsClient client, bool getAclProperty, bool getDiskUsage, string dumpFileName, bool saveToLocal, int numThreads = -1, bool displayFiles = false, bool hideConsistentAcl = false, long maxDepth = Int64.MaxValue, CancellationToken cancelToken = default (CancellationToken)) { return new PropertyManager(client, getAclProperty, getDiskUsage, dumpFileName, saveToLocal, numThreads, displayFiles, hideConsistentAcl, maxDepth, cancelToken).RunGetProperty(path); } private void ConsumerRun() { while (true) { if (_cancelToken.IsCancellationRequested) { return; } var job = ConsumerQueue.Poll(); if (GetException() != null || job == null || job is PoisonJob) { ConsumerQueue.Add(new PoisonJob()); return; } try { job.DoRun(PropertyJobLog); } catch (AdlsException ex) { if (ex.HttpStatus != HttpStatusCode.NotFound)//Do not stop acl processor if the file/directory is deleted { SetException(ex);//Sets the global exception to signal other threads to close ConsumerQueue.Add(new PoisonJob());//Handle corner cases like when exception is raised other threads can be in wait state return; } } catch (Exception ex) { SetException(ex); ConsumerQueue.Add(new PoisonJob());//Handle corner cases like when exception is raised other threads can be in wait state return; } } } /// <summary> /// This is the run method for the thread writing the dump file. Only ONE THREAD /// runs this method /// </summary> private void WriterThreadRun() { while (true) { var job = PropertyWriterQueue.Poll(); if (GetException() != null || job == null || job is PoisonJob) { return; } try { job.DoRun(PropertyJobLog); } catch (Exception ex) { SetException(ex); return; } } } } }