AdlsDotNetSDK/AclTools/AclProcessor.cs (336 lines of code) (raw):

using Microsoft.Azure.DataLake.Store.Acl; using Microsoft.Azure.DataLake.Store.AclTools.Jobs; using Microsoft.Azure.DataLake.Store.QueueTools; using NLog; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Net; using System.Threading; namespace Microsoft.Azure.DataLake.Store.AclTools { /// <summary> /// Types of ACL operations /// </summary> public enum RequestedAclType { /// <summary> /// Adds new Acl entries /// </summary> ModifyAcl, /// <summary> /// Resets the acl entries /// </summary> SetAcl, /// <summary> /// Removes the specified ACL entries /// </summary> RemoveAcl } /// <summary> /// This class is used to Acl Processor and Acl Verification. Acl Verification is insternal /// </summary> internal class AclProcessor { private static readonly Logger AclLog = LogManager.GetLogger("adls.dotnet.AclTool"); private static readonly Logger AclJobLog = LogManager.GetLogger("adls.dotnet.AclTool.Job"); private readonly string _inputPath; /// <summary> /// Adls Client /// </summary> internal AdlsClient Client { get; } /// <summary> /// Acl entries that should be used for Acl modify/set/remove /// </summary> internal List<AclEntry> AclEntries { get; } /// <summary> /// AclEntries without the default Acl entries since default does not apply for files /// </summary> internal List<AclEntry> FileAclEntries { get; } /// <summary> /// Priority Queue that queues the Acl jobs /// </summary> internal PriorityQueueWrapper<BaseJob> Queue { get; } // Verify flag- when set verifies whether acls have been changed correctly private readonly bool _isVerify; /// <summary> /// Client exception if any raised by any thread /// </summary> private Exception _clientException; /// <summary> /// Number of threads that process the Acl /// </summary> private int NumThreads { get; } private readonly CancellationToken _cancelToken; private readonly IProgress<AclProcessorStats> _aclStatusTracker; /// <summary> /// Flag that represents consumer is done- used by stat collection thread /// </summary> private long _consumerDone; /// <summary> /// Stat collecting threads /// </summary> private Thread _threadStats; /// <summary> /// Array of thread workers /// </summary> private readonly Thread[] _threadWorker; /// <summary> /// Mutex Lock object for doing synchronized setting and getting the clientexception /// </summary> private readonly Object _thisLock = new Object(); /// <summary> /// Job type-Set Acl or Modify Acl or Remove Acl or ACl set verify or Acl modify verify or Acl remove Verfiy /// </summary> internal RequestedAclType Type { get; } /// <summary> /// Total Files processed, this is updated after enumeration /// </summary> private long _filesEnumerated; /// <summary> /// Total Directories processed, this is updated after enumeration /// </summary> private long _directoryEnumerated; /// <summary> /// List of directories incorrectly processed /// </summary> private long _incorrectDirectoryCount; /// <summary> /// List of files incorrectly processed /// </summary> private long _incorrectFileCount; private readonly string _incorrectVerifyFile; private StreamWriter _incorrectVerifyFileStream; private QueueWrapper<string> _incorrectFileList; private readonly bool _ignoreVerifyTimeErrors; private ConcurrentBag<string> _linkPaths; /// <summary> /// thread to dump incorrect files in a file /// </summary> private Thread _threadDumpIncorrectFiles; private AclProcessor(string path, AdlsClient client, List<AclEntry> aclEntries, RequestedAclType type, int threadCount, IProgress<AclProcessorStats> aclStatusTracker, CancellationToken cancelToken, bool verify = false, string verifyFile = null, bool ignoreVerifyTimeErrors = false) { _inputPath = path; Client = client; NumThreads = threadCount <= 0 ? AdlsClient.DefaultNumThreads : threadCount; Queue = new PriorityQueueWrapper<BaseJob>(NumThreads); _threadWorker = new Thread[NumThreads]; if (aclEntries == null || aclEntries.Count == 0) { throw new ArgumentException("Input acl is null or empty"); } AclEntries = aclEntries; FileAclEntries = new List<AclEntry>(AclEntries.Count); foreach (var entry in AclEntries) { if (entry.Scope == AclScope.Access) { FileAclEntries.Add(entry); } } if (FileAclEntries.Count == 0 && AclLog.IsDebugEnabled) { AclLog.Debug("AclEntries for file are empty so input acl must be containing default acls"); } Type = type; _isVerify = verify; _aclStatusTracker = aclStatusTracker; _cancelToken = cancelToken; // If verify file is passed we have to setup a thread and a filestream to write to the file if (verify && !string.IsNullOrEmpty(verifyFile)) { _ignoreVerifyTimeErrors = ignoreVerifyTimeErrors; _incorrectVerifyFile = verifyFile; _incorrectFileList = new QueueWrapper<string>(-1); Utils.CreateParentDirectory(_incorrectVerifyFile); _incorrectVerifyFileStream = new StreamWriter(new FileStream(_incorrectVerifyFile, FileMode.OpenOrCreate, FileAccess.ReadWrite)) { AutoFlush = true }; } _linkPaths = new ConcurrentBag<string>(); if (AclLog.IsDebugEnabled) { AclLog.Debug($"AclProcessor, Name: {_inputPath}, Threads: {NumThreads}, AclChangeType: {Type}, InputAcl: {string.Join(":", AclEntries)}{(_isVerify ? ", RunInVerifyMode" : string.Empty)}"); } } /// <summary> /// Atomically sets the client exception /// </summary> /// <param name="ex"></param> private void SetException(Exception ex) { lock (_thisLock) { if (_clientException == null) { _clientException = ex; } } } /// <summary> /// Atomically gets the client exception /// </summary> /// <returns></returns> private Exception GetException() { lock (_thisLock) { return _clientException; } } /// <summary> /// Api to call Acl Processor. Runs Acl Processor and returns the results. /// </summary> /// <param name="path">Root path from where the Acl recursive processor will start</param> /// <param name="client">ADLS Client</param> /// <param name="aclEntries">Acl Entries to change</param> /// <param name="type">Type of Acl Job: Acl modify or Acl set or acl remove</param> /// <param name="threadCount">Custom number of threads</param> /// <param name="aclStatus">Status of progress</param> /// <param name="cancelToken">Cancellationtoken</param> /// <returns></returns> internal static AclProcessorStats RunAclProcessor(string path, AdlsClient client, List<AclEntry> aclEntries, RequestedAclType type, int threadCount = -1, IProgress<AclProcessorStats> aclStatus = null, CancellationToken cancelToken = default(CancellationToken)) { return new AclProcessor(path, client, aclEntries, type, threadCount, aclStatus, cancelToken).ProcessAcl(); } /// <summary> /// Internal test Api to verify Acl Processor. Runs Acl verifier and returns number of files and directories processed correctly. /// </summary> /// <param name="path">Root path from where the Acl recursive verifier will start</param> /// <param name="client">ADLS Client</param> /// <param name="aclEntries">Acl Entries to verify</param> /// <param name="type">Type of Acl Job: Acl modify verify or Acl set verify or acl remove verify</param> /// <param name="threadCount">Custom number of threads</param> /// <param name="verifyFile">Verification file</param> /// <param name="ignoreError">If passed true, then we will ignore the error and dump the error in verifyFile. Pass this true only if verifyFile is not null</param> /// <param name="statusTracker">Status Tracker</param> /// <param name="cancelToken">Cancel Token</param> /// <returns></returns> internal static AclProcessorStats RunAclVerifier(string path, AdlsClient client, List<AclEntry> aclEntries, RequestedAclType type, int threadCount = -1, string verifyFile = null, bool ignoreError = false, IProgress<AclProcessorStats> statusTracker = null, CancellationToken cancelToken = default(CancellationToken)) { return new AclProcessor(path, client, aclEntries, type, threadCount, statusTracker, cancelToken, true, verifyFile, ignoreError) .ProcessAcl(); } /// <summary> /// Starts the Acl Processor threads. Returns the results or throws any exceptions. /// </summary> /// <returns>Acl Processor: Number of files and directories processed or ACl Verification: Number of files and directories processed and number of files and directories correctly processed by Acl Processor</returns> private AclProcessorStats ProcessAcl() { if (_cancelToken.IsCancellationRequested) { return _isVerify ? new AclProcessorStats(_filesEnumerated, _directoryEnumerated, _incorrectFileCount, _incorrectDirectoryCount, _linkPaths) : new AclProcessorStats(_filesEnumerated, _directoryEnumerated, 0, 0, _linkPaths); } //Create the threads for (int i = 0; i < NumThreads; i++) { _threadWorker[i] = new Thread(Run) { Name = "Thread: " + i }; } if (_aclStatusTracker != null) { _threadStats = new Thread(StatsRun) { Name = "StatsThread" }; } if (!string.IsNullOrEmpty(_incorrectVerifyFile)) { _threadDumpIncorrectFiles = new Thread(VerifyFileDumpRun) { Name = "Verify Dump Thread" }; } // Put the first entry to queue DirectoryEntry dir = Client.GetDirectoryEntry(_inputPath); ProcessDirectoryEntry(dir); // Start the threads for (int i = 0; i < NumThreads; i++) { _threadWorker[i].Start(); } if (_aclStatusTracker != null) { _threadStats.Start(); } if (!string.IsNullOrEmpty(_incorrectVerifyFile)) { _threadDumpIncorrectFiles.Start(); } //Join the threads for (int i = 0; i < NumThreads; i++) { _threadWorker[i].Join(); } if (_aclStatusTracker != null) { Interlocked.Increment(ref _consumerDone); _threadStats.Join(); } if (!string.IsNullOrEmpty(_incorrectVerifyFile)) { // Signify the end of the queue _incorrectFileList.Add(null); _threadDumpIncorrectFiles.Join(); } if (GetException() != null) { throw GetException(); } return _isVerify ? new AclProcessorStats(_filesEnumerated, _directoryEnumerated, _incorrectFileCount, _incorrectDirectoryCount, _linkPaths) : new AclProcessorStats(_filesEnumerated, _directoryEnumerated, 0, 0, _linkPaths); } internal void ProcessDirectoryEntry(DirectoryEntry dir) { if (dir.Type == DirectoryEntryType.DIRECTORY) { if (AclLog.IsDebugEnabled) { AclLog.Debug($"Enumerate job submitted for: {dir.FullName}"); } Queue.Add(new EnumerateDirectoryChangeAclJob(this, dir.FullName)); Interlocked.Increment(ref _directoryEnumerated); } else { // If the input only contains default acl then the FileAclEntries willl be empty if (FileAclEntries.Count == 0) { return; } Interlocked.Increment(ref _filesEnumerated); } if (AclLog.IsDebugEnabled) { AclLog.Debug($"{(_isVerify ? "VerifyAcl" : "ChangeAcl")} job submitted for: {dir.FullName}"); } if (_isVerify) { Queue.Add(new VerifyChangeAclJob(this, dir.FullName, dir.Type)); } else { Queue.Add(new ChangeAclJob(this, dir.FullName, dir.Type)); } } /// <summary> /// Increments the correct count of files and directories /// </summary> /// <param name="type">Type of Directory entry</param> /// <param name="fullPath">Path</param> /// <param name="error">error</param> internal void IncrementIncorrectCount(DirectoryEntryType type, string fullPath, string error = null) { // If dumping the incorrect files to a file then put in queue if (!string.IsNullOrEmpty(_incorrectVerifyFile)) { if (!string.IsNullOrEmpty(error)) { fullPath += "," + error; } _incorrectFileList.Add(fullPath); } if (type == DirectoryEntryType.DIRECTORY) { Interlocked.Increment(ref _incorrectDirectoryCount); } else { Interlocked.Increment(ref _incorrectFileCount); } } /// <summary> /// Adds path with Link attribute set /// </summary> /// <param name="fullPath">Path</param> internal void AddLinkPath(string fullPath) { _linkPaths.Add(fullPath); } /// <summary> /// Delegate method run by stats thread /// </summary> private void StatsRun() { while (Interlocked.Read(ref _consumerDone) == 0) { _aclStatusTracker.Report(new AclProcessorStats(_filesEnumerated, _directoryEnumerated, 0, 0, _linkPaths)); Thread.Sleep(5000); } } /// <summary> /// Delegate method run by stats thread /// </summary> private void VerifyFileDumpRun() { while (true) { var entry = _incorrectFileList.Poll(); if (entry == null) { break; } _incorrectVerifyFileStream.WriteLine(entry); } _incorrectVerifyFileStream.Dispose(); } private void DumpIgnoredVerificationError(BaseJob job) { // By default if job is enumerate, type is directory DirectoryEntryType type = DirectoryEntryType.DIRECTORY; string fullName = ""; if (job is VerifyChangeAclJob) { type = ((VerifyChangeAclJob)job).EntryType; fullName = ((VerifyChangeAclJob)job).FullPath; } else if (job is EnumerateDirectoryChangeAclJob) { fullName = ((EnumerateDirectoryChangeAclJob)job).FullPath; } IncrementIncorrectCount(type, fullName, "Exception"); } /// <summary> /// Method run by a single thread. Polls a directory entry. If job is of type EnumerateDirectory then enumerates contents in it and queues them. /// If job is of type Acl Change then performs the Acl change. If job type is Acl Verify then does acl verification /// </summary> private void Run() { try { while (true) { if (_cancelToken.IsCancellationRequested) { return; } var job = Queue.Poll(); if (GetException() != null || job == null || job is PoisonJob)//Exception or Poision block (all threads are waiting) { Queue.Add(new PoisonJob()); return; } try { job.DoRun(AclJobLog); } catch (AdlsException ex) { if (ex.HttpStatus != HttpStatusCode.NotFound)//Do not stop acl processor if the file/directory is deleted { if (_ignoreVerifyTimeErrors) { DumpIgnoredVerificationError(job); } else { SetException(ex);//Sets the global exception to signal other threads to close Queue.Add(new PoisonJob());//Handle corner cases like when exception is raised other threads can be in wait state return; } } } catch (Exception ex) { if (_ignoreVerifyTimeErrors) { DumpIgnoredVerificationError(job); } else { SetException(ex); Queue.Add(new PoisonJob());//Handle corner cases like when exception is raised other threads can be in wait state return; } } } } catch (Exception e) { // This should never come here, but just in case of SynchronizationLockException at least thread will exit dutifully if (AclLog.IsDebugEnabled) { AclLog.Debug("Unexpected error: " + e.Message); } } } } }