AdlsDotNetSDK/MockAdlsFileSystem/MockAdlsClient.cs (802 lines of code) (raw):
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.DataLake.Store.Acl;
using Microsoft.Azure.DataLake.Store.AclTools;
using Microsoft.Azure.DataLake.Store.FileTransfer;
namespace Microsoft.Azure.DataLake.Store.MockAdlsFileSystem
{
internal struct DirectoryEntryMetaData
{
internal MemoryStream DataStream;
internal DirectoryEntry Entry;
internal AclStatus AclData;
internal DateTime CreationTime;
}
/// <summary>
/// Mock Adls Client. All the operations are done in memory. This is not a accurate immplementation of actual adlsclient. The immplementations are best effort only.
/// </summary>
public sealed class MockAdlsClient : AdlsClient
{
private readonly IDictionary<string, DirectoryEntryMetaData> _directoryEntries;
private readonly IDictionary<string, DirectoryEntryMetaData> _trashDirectoryEntries; // contains trashPath->deletedEntry mapping
private static readonly string Owner = Guid.NewGuid().ToString();
private static readonly string Group = Guid.NewGuid().ToString();
private static string accountName;
private static Random random = new Random();
private MockAdlsClient(string accountNm) : base(accountNm, 1)
{
_directoryEntries = new ConcurrentDictionary<string, DirectoryEntryMetaData>();
_trashDirectoryEntries = new ConcurrentDictionary<string, DirectoryEntryMetaData>();
// The root directory is there
CreateDirectory("/");
ModifyAclEntries("/",new List<AclEntry>(){new AclEntry(AclType.user,Owner,AclScope.Access, AclAction.All)});
}
/// <summary>
/// Factory method that returns an instance of Mock adls client
/// </summary>
/// <returns>Mock ADls Client</returns>
public static MockAdlsClient GetMockClient()
{
accountName = "test.azuredatalakestore.net";
return new MockAdlsClient(accountName);
}
/// <summary>
/// Factory method that returns an instance of Mock adls client
/// </summary>
/// <returns>Mock ADls Client</returns>
public static MockAdlsClient GetMockClient(string account)
{
return new MockAdlsClient(account);
}
// Creates an entry for a new file or directory.
private DirectoryEntryMetaData CreateMetaData(string entryName,DirectoryEntryType type,string octalPermission)
{
var entry = new DirectoryEntry(entryName)
{
Permission = octalPermission ?? "770",
LastAccessTime = DateTime.UtcNow,
LastModifiedTime = DateTime.UtcNow,
Type = type,
ExpiryTime = null,
User = Owner,
Group= Group
};
var aclStatus = new AclStatus(new List<AclEntry>(), Owner, Group, octalPermission ?? "770", false);
var metaData = new DirectoryEntryMetaData()
{
DataStream = type== DirectoryEntryType.FILE?new MemoryStream():null,
Entry = entry,
AclData = aclStatus,
CreationTime = DateTime.UtcNow
};
return metaData;
}
/// <summary>
/// Creates a directory- Creates an entry for the directory in the internal dictionary
/// </summary>
/// <param name="dirName">Directory name</param>
/// <param name="octalPermission">Octal permission</param>
/// <param name="cancelToken">Cacnellation token</param>
public override async Task<bool> CreateDirectoryAsync(string dirName, string octalPermission = null,
CancellationToken cancelToken = default(CancellationToken))
{
await Task.Run(() =>
_directoryEntries.Add(dirName, CreateMetaData(dirName, DirectoryEntryType.DIRECTORY, octalPermission)),cancelToken);
return true;
}
/// <summary>
/// Creates a directory- Creates an entry for the directory in the internal dictionary
/// </summary>
/// <param name="dirName">Directory name</param>
/// <param name="octalPermission">Octal permission</param>
/// <param name="cancelToken">Cacnellation token</param>
/// <returns></returns>
public override bool CreateDirectory(string dirName, string octalPermission = null,
CancellationToken cancelToken = default(CancellationToken))
{
return CreateDirectoryAsync(dirName, octalPermission, cancelToken).GetAwaiter().GetResult();
}
/// <summary>
/// Returns a memory stream for reading data of the file
/// </summary>
/// <param name="filename">File name</param>
/// <param name="cancelToken">Cancellation token</param>
/// <returns></returns>
public override async Task<AdlsInputStream> GetReadStreamAsync(string filename,
CancellationToken cancelToken = default(CancellationToken))
{
return await Task.Run(() =>
{
if (!_directoryEntries.ContainsKey(filename))
{
throw new AdlsException("The file does not exists");
}
return new MockAdlsInputStream(_directoryEntries[filename].DataStream);
}, cancelToken);
}
/// <summary>
/// Returns a memory stream for reading data of the file
/// </summary>
/// <param name="filename">File name</param>
/// <param name="cancelToken">Cancellation token</param>
/// <returns></returns>
public override AdlsInputStream GetReadStream(string filename, CancellationToken cancelToken = default(CancellationToken))
{
return GetReadStreamAsync(filename, cancelToken).GetAwaiter().GetResult();
}
/// <summary>
/// Returns the memory stream for appending to the file encapsulated in mock adls output stream.
/// </summary>
/// <param name="filename">File name</param>
/// <param name="cancelToken">Cancellation token</param>
/// <returns></returns>
public override async Task<AdlsOutputStream> GetAppendStreamAsync(string filename,
CancellationToken cancelToken = default(CancellationToken))
{
return await Task.Run(() =>
{
if (!_directoryEntries.ContainsKey(filename))
{
throw new AdlsException("The file does not exists");
}
_directoryEntries[filename].DataStream.Seek(_directoryEntries[filename].DataStream.Length,
SeekOrigin.Begin);
return new MockAdlsOutputStream(_directoryEntries[filename].DataStream,
_directoryEntries[filename].Entry);
}, cancelToken);
}
/// <summary>
/// Returns the memory stream for appending to the file encapsulated in mock adls output stream.
/// </summary>
/// <param name="filename">File name</param>
/// <param name="cancelToken">Cancellation token</param>
/// <returns></returns>
public override AdlsOutputStream GetAppendStream(string filename, CancellationToken cancelToken = default(CancellationToken))
{
return GetAppendStreamAsync(filename, cancelToken).GetAwaiter().GetResult();
}
/// <summary>
/// Creates an entry to the internal dictionary for the new file. The entry encapsulates AclStatus, DirectoryEntry and a memory stream
/// </summary>
/// <param name="filename">File name</param>
/// <param name="mode">If exists hether to overwrite or fail</param>
/// <param name="octalPermission">Permission string</param>
/// <param name="createParent">True if we create parent directories- currently has no effect</param>
/// <param name="cancelToken">Cancellation token</param>
/// <returns>Mock ADls output stream</returns>
public override async Task<AdlsOutputStream> CreateFileAsync(string filename, IfExists mode, string octalPermission = null,
bool createParent = true, CancellationToken cancelToken = default(CancellationToken))
{
return await Task.Run(() =>
{
if (mode == IfExists.Fail && _directoryEntries.ContainsKey(filename))
{
throw new AdlsException("The file exists");
}
if (mode == IfExists.Overwrite && _directoryEntries.ContainsKey(filename))
{
_directoryEntries.Remove(filename);
}
var metaData = CreateMetaData(filename, DirectoryEntryType.FILE, octalPermission);
_directoryEntries.Add(filename, metaData);
return new MockAdlsOutputStream(metaData.DataStream, metaData.Entry);
}, cancelToken);
}
/// <summary>
/// Creates an entry to the internal dictionary for the new file. The entry encapsulates AclStatus, DirectoryEntry and a memory stream
/// </summary>
/// <param name="filename">File name</param>
/// <param name="mode">If exists hether to overwrite or fail</param>
/// <param name="octalPermission">Permission string</param>
/// <param name="createParent">True if we create parent directories- currently has no effect</param>
/// <returns>Mock ADls output stream</returns>
public override AdlsOutputStream CreateFile(string filename, IfExists mode, string octalPermission = null,
bool createParent = true)
{
return CreateFileAsync(filename, mode, octalPermission, createParent).GetAwaiter().GetResult();
}
/// <summary>
/// Delete an entry from the internal dictionary
/// </summary>
/// <param name="path">Path of the file or directory</param>
/// <param name="cancelToken">Cancellation token</param>
/// <returns></returns>
public override async Task<bool> DeleteAsync(string path, CancellationToken cancelToken = default(CancellationToken))
{
return await Task.Run(() =>
{
foreach (var directoryEntriesKey in _directoryEntries.Keys)
{
if (directoryEntriesKey.StartsWith(path + "/"))
{
return false;
}
}
bool ret = false;
if(_directoryEntries.ContainsKey(path))
{
var dirEntry = _directoryEntries[path];
ret = _directoryEntries.Remove(path);
var originalPath = dirEntry.Entry.FullName;
string trashPath = "/deleted/local/" + random.Next(10000000);
// Add it to the list of items in trash
lock (_trashDirectoryEntries)
{
_trashDirectoryEntries.Add(trashPath, dirEntry);
}
}
return ret;
}, cancelToken);
}
/// <summary>
/// Delete an entry from the internal dictionary
/// </summary>
/// <param name="path">Path of the file or directory</param>
/// <param name="cancelToken">Cancellation token</param>
/// <returns></returns>
public override bool Delete(string path, CancellationToken cancelToken = default(CancellationToken))
{
return DeleteAsync(path, cancelToken).GetAwaiter().GetResult();
}
/// <summary>
/// Deletes all entries within a directory or delete a file
/// </summary>
/// <param name="path">Path of directory or file</param>
/// <param name="cancelToken">Cancellation token</param>
/// <returns></returns>
public override async Task<bool> DeleteRecursiveAsync(string path, CancellationToken cancelToken = default(CancellationToken))
{
return await Task.Run(() =>
{
var keysToRemove = new List<string>();
foreach (var directoryEntriesKey in _directoryEntries.Keys)
{
if (directoryEntriesKey.StartsWith(path))
{
keysToRemove.Add(directoryEntriesKey);
}
}
foreach (var key in keysToRemove)
{
if (!_directoryEntries.Remove(key))
{
return false;
}
}
return true;
}, cancelToken);
}
/// <summary>
/// Deletes all entries within a directory or delete a file
/// </summary>
/// <param name="path">Path of directory or file</param>
/// <param name="cancelToken">Cancellation token</param>
/// <returns></returns>
public override bool DeleteRecursive(string path, CancellationToken cancelToken = default(CancellationToken))
{
return DeleteRecursiveAsync(path, cancelToken).GetAwaiter().GetResult();
}
/// <summary>
/// Asynchronously gets the trash entries
/// </summary>
/// <param name="hint">String to match. Cannot be empty.</param>
/// <param name="listAfter">Token returned by system in the previous API invocation</param>
/// <param name="numResults">Search is executed until we find numResults or search completes. Maximum allowed value for this param is 4000. The number of returned entries could be more or less than numResults</param>
/// <param name="progressTracker">Object to track progress of the task. Can be null</param>
/// <param name="cancelToken">CancellationToken to cancel the request</param>
public override async Task<IEnumerable<TrashEntry>> EnumerateDeletedItemsAsync(string hint, string listAfter, int numResults, IProgress<EnumerateDeletedItemsProgress> progressTracker, CancellationToken cancelToken)
{
return await Task.Run(() =>
{
List<TrashEntry> trashEntries = new List<TrashEntry>();
var matchingPaths = new List<string>();
int numSearched = 0;
int numFound = 0;
string nextListAfter = "";
var enumerator = _trashDirectoryEntries.GetEnumerator();
// Search after "listafter" element
if (!string.IsNullOrEmpty(listAfter))
{
while (enumerator.MoveNext())
{
if (enumerator.Current.Key.Equals(listAfter))
{
break;
}
}
}
// Start hint comparison now
while (enumerator.MoveNext())
{
numSearched++;
if (enumerator.Current.Value.Entry.FullName.Contains(hint))
{
numFound++;
matchingPaths.Add(enumerator.Current.Key);
}
if(numFound == numResults)
{
nextListAfter = enumerator.Current.Key;
break;
}
}
foreach(var trashPath in matchingPaths)
{
var dirEntry = _trashDirectoryEntries[trashPath];
//convert to fsurls
TrashEntry trashEntry = new TrashEntry();
trashEntry.OriginalPath = "adl://" + accountName + dirEntry.Entry.FullName;
trashEntry.TrashDirPath = "adl://" + accountName + trashPath;
trashEntry.CreationTime = dirEntry.CreationTime;
trashEntry.Type = (dirEntry.Entry.Type == DirectoryEntryType.DIRECTORY ? TrashEntryType.DIRECTORY : TrashEntryType.FILE);
trashEntries.Add(trashEntry);
}
if (progressTracker != null)
{
EnumerateDeletedItemsProgress progress = new EnumerateDeletedItemsProgress { NumFound = numFound, NumSearched = numSearched, NextListAfter = nextListAfter };
progressTracker.Report(progress);
}
return trashEntries;
}, cancelToken);
}
/// <summary>
/// Search trash under a account with hint and a starting point. This is a long running operation,
/// and user is updated with progress periodically.
/// </summary>
/// <param name="hint">String to match</param>
/// <param name="listAfter">Token returned by system in the previous API invocation</param>
/// <param name="numResults">Search is executed until we find numResults or search completes. Maximum allowed value for this param is 4000. The number of returned entries could be more or less than numResults</param>
/// <param name="progressTracker">Object to track progress of the task. Can be null</param>
/// <param name="cancelToken">CancellationToken to cancel the request</param>
public override IEnumerable<TrashEntry> EnumerateDeletedItems(string hint, string listAfter, int numResults, IProgress<EnumerateDeletedItemsProgress> progressTracker, CancellationToken cancelToken = default(CancellationToken))
{
return EnumerateDeletedItemsAsync(hint, listAfter, numResults, progressTracker, cancelToken).GetAwaiter().GetResult();
}
/// <summary>
/// Synchronously Restores trash entry
/// </summary>
/// <param name="pathOfFileToRestoreInTrash">Trash Directory path returned by enumeratedeleteditems</param>
/// <param name="restoreDestination">Destination for restore</param>
/// <param name="type">type of restore - file or directory</param>
/// <param name="restoreAction">Action to take in case of destination conflict</param>
/// <param name="cancelToken">CancellationToken to cancel the request</param>
public override void RestoreDeletedItems(string pathOfFileToRestoreInTrash, string restoreDestination, string type, string restoreAction = "", CancellationToken cancelToken = default(CancellationToken))
{
RestoreDeletedItemsAsync(pathOfFileToRestoreInTrash, restoreDestination, type, restoreAction, cancelToken).GetAwaiter().GetResult();
}
/// <summary>
/// Asynchronously Restores trash entry
/// </summary>
/// <param name="pathOfFileToRestoreInTrash">Trash Directory path returned by enumeratedeleteditems</param>
/// <param name="restoreDestination">Destination for restore</param>
/// <param name="type">type of restore - file or directory</param>
/// <param name="restoreAction">Action to take in case of destination conflict</param>
/// <param name="cancelToken">CancellationToken to cancel the request</param>
public override async Task RestoreDeletedItemsAsync(string pathOfFileToRestoreInTrash, string restoreDestination, string type, string restoreAction = "", CancellationToken cancelToken = default(CancellationToken))
{
await Task.Run(() =>
{
// convert FsUrls back to relative paths
pathOfFileToRestoreInTrash = pathOfFileToRestoreInTrash.Substring(("adl://" + accountName).Length);
restoreDestination = restoreDestination.Substring(("adl://" + accountName).Length);
if (!_trashDirectoryEntries.Keys.Contains(pathOfFileToRestoreInTrash))
{
throw new ArgumentException();
}
var enumerator = _trashDirectoryEntries.GetEnumerator();
while(enumerator.MoveNext())
{
if(enumerator.Current.Key.Equals(pathOfFileToRestoreInTrash))
{
var deletedEntry = _trashDirectoryEntries[pathOfFileToRestoreInTrash];
var entryType = deletedEntry.Entry.Type;
if(String.IsNullOrEmpty(type) || !(type.Equals("file") || type.Equals("folder")))
{
throw new ArgumentException();
}
if((type.Equals("file") ? DirectoryEntryType.FILE : DirectoryEntryType.DIRECTORY) != entryType)
{
throw new ArgumentException();
}
// Make sure there is no conflict
if (_directoryEntries.ContainsKey(restoreDestination))
{
if (String.Equals(restoreAction, "copy", StringComparison.OrdinalIgnoreCase))
{
restoreDestination += "_" + random.Next(10000000);
}
else if (String.Equals(restoreAction, "overwrite", StringComparison.OrdinalIgnoreCase)) // TODO: type is stream
{
if (String.Equals(type, "folder", StringComparison.OrdinalIgnoreCase))
{
throw new ArgumentException();
}
}
}
// Start the restore;
deletedEntry.Entry.FullName = restoreDestination;
deletedEntry.Entry.Name = restoreDestination.Substring(restoreDestination.LastIndexOf("/") + 1); // TODO: fix name
// Add it back
_directoryEntries.Add(restoreDestination, deletedEntry);
// Remove it from trash items
_trashDirectoryEntries.Remove(pathOfFileToRestoreInTrash);
break;
}
}
return;
}, cancelToken);
}
private void MoveOneEntry(string src, string dest)
{
var metaData = _directoryEntries[src];
metaData.Entry.Name = metaData.Entry.FullName = dest;
_directoryEntries.Add(dest, metaData);
_directoryEntries.Remove(src);
}
// Moves all files in the directory
private void MoveDirectory(string srcPath, string destPath)
{
var list=new List<string>();
foreach (var path in _directoryEntries.Keys)
{
if (path.StartsWith(srcPath+"/"))
{
list.Add(path);
}
}
foreach (var entry in list)
{
string destfileName = destPath + "/" + Path.GetFileName(entry);
MoveOneEntry(entry,destfileName);
}
// If the user ahs created the directory explicitly it will exist in the dictionary
if (_directoryEntries.ContainsKey(srcPath))
{
MoveOneEntry(srcPath,destPath);
}
}
/// <summary>
/// Removes the source entry and add a new entry in the internal dictionary with the same metadata of the source entry
/// </summary>
/// <param name="path">Source path name</param>
/// <param name="destination">Destination path</param>
/// <param name="overwrite">True if we want to overwrite the destination file</param>
/// <param name="cancelToken">Cancellation token</param>
/// <returns></returns>
public override async Task<bool> RenameAsync(string path, string destination, bool overwrite = false,
CancellationToken cancelToken = default(CancellationToken))
{
return await Task.Run(() =>
{
if (!_directoryEntries.ContainsKey(path))
{
throw new AdlsException("the path does not exist");
}
// The destination exist part is still not fully mocked.
if (_directoryEntries.ContainsKey(destination))
{
if (_directoryEntries[destination].Entry.Type == DirectoryEntryType.FILE)
{
MoveOneEntry(path, destination);
return true;
}
var destEntryName = path + "/" + Path.GetFileName(path);
if (_directoryEntries.ContainsKey(destEntryName))
{
return false;
}
MoveDirectory(path, destEntryName);
}
else
{
if (_directoryEntries[path].Entry.Type == DirectoryEntryType.FILE)
{
MoveOneEntry(path, destination);
}
else
{
MoveDirectory(path, destination);
}
}
return true;
}, cancelToken);
}
/// <summary>
/// Removes the source entry and add a new entry in the internal dictionary with the same metadata of the source entry
/// </summary>
/// <param name="path">Source path name</param>
/// <param name="destination">Destination path</param>
/// <param name="overwrite">True if we want to overwrite the destination file</param>
/// <param name="cancelToken">Cancellation token</param>
/// <returns></returns>
public override bool Rename(string path, string destination, bool overwrite = false,
CancellationToken cancelToken = default(CancellationToken))
{
return RenameAsync(path, destination, overwrite, cancelToken).GetAwaiter().GetResult();
}
/// <summary>
/// Get Directory or file info
/// </summary>
/// <param name="path">Path of file or directory</param>
/// <param name="userIdFormat">User or group Id format</param>
/// <param name="cancelToken">Cancellation token</param>
public override async Task<DirectoryEntry> GetDirectoryEntryAsync(string path,
UserGroupRepresentation userIdFormat = UserGroupRepresentation.ObjectID,
CancellationToken cancelToken = default(CancellationToken))
{
return await Task.Run(() =>
{
// Update the length here
if (_directoryEntries.ContainsKey(path))
{
if (_directoryEntries[path].Entry.Type == DirectoryEntryType.FILE)
{
_directoryEntries[path].Entry.Length =
_directoryEntries[path].DataStream.Length; // Update the stream length
}
return new DirectoryEntry(_directoryEntries[path].Entry); // send deep copy
}
// The input path path can be a directory which is not in the dictionary
foreach (var entries in _directoryEntries.Keys)
{
if (entries.StartsWith(path + "/")) // This has to be directory
{
return new DirectoryEntry(path)
{
Type = DirectoryEntryType.DIRECTORY,
Length = 0
};
}
}
throw new AdlsException("Not exist") {HttpStatus = HttpStatusCode.NotFound};
}, cancelToken);
}
/// <summary>
/// Get Directory or file info
/// </summary>
/// <param name="path">Path of file or directory</param>
/// <param name="userIdFormat">User or group Id format</param>
/// <param name="cancelToken">Cancellation token</param>
/// <returns></returns>
public override DirectoryEntry GetDirectoryEntry(string path,
UserGroupRepresentation userIdFormat = UserGroupRepresentation.ObjectID,
CancellationToken cancelToken = default(CancellationToken))
{
return GetDirectoryEntryAsync(path, userIdFormat, cancelToken).GetAwaiter().GetResult();
}
/// <summary>
/// Concats the memory stream of source entries and merges them into a new memory stream
/// </summary>
/// <param name="destination">Destination entry</param>
/// <param name="concatFiles">Concat files</param>
/// <param name="deleteSource">True if we want to delete source</param>
/// <param name="cancelToken">Cancellation token</param>
public override async Task ConcatenateFilesAsync(string destination, List<string> concatFiles, bool deleteSource = false,
CancellationToken cancelToken = default(CancellationToken))
{
await Task.Run(() =>
{
var memoryStream = CreateFile(destination, IfExists.Overwrite);
foreach (var file in concatFiles)
{
if (!_directoryEntries.ContainsKey(file))
{
throw new AdlsException("The file to concat does not exist");
}
_directoryEntries[file].DataStream.CopyTo(memoryStream);
Delete(file);
}
}, cancelToken);
}
/// <summary>
/// Concats the memory stream of source entries and merges them into a new memory stream
/// </summary>
/// <param name="destination">Destination entry</param>
/// <param name="concatFiles">Concat files</param>
/// <param name="deleteSource">True if we want to delete source</param>
/// <param name="cancelToken">Cancellation token</param>
public override void ConcatenateFiles(string destination, List<string> concatFiles, bool deleteSource = false,
CancellationToken cancelToken = default(CancellationToken))
{
ConcatenateFilesAsync(destination, concatFiles, deleteSource, cancelToken).GetAwaiter().GetResult();
}
/// <summary>
/// Returns a list of entries contained under the given directory
/// </summary>
/// <param name="path">Path of directory or file</param>
/// <param name="userIdFormat">User or group Id format</param>
/// <param name="cancelToken">Cancellation token</param>
/// <returns></returns>
public override IEnumerable<DirectoryEntry> EnumerateDirectory(string path, UserGroupRepresentation userIdFormat = UserGroupRepresentation.ObjectID,
CancellationToken cancelToken = default(CancellationToken))
{
if (!path.EndsWith("/"))
{
path = path + "/";
}
var listToReturn = new List<DirectoryEntry>();
// This is not fully mocked. Very naive only works for directory that has files and empty directories
foreach (var directoryEntriesKey in _directoryEntries.Keys)
{
if (directoryEntriesKey.StartsWith(path))
{
listToReturn.Add(new DirectoryEntry(_directoryEntries[directoryEntriesKey].Entry));
}
}
return listToReturn;
}
/// <summary>
/// Sets the expiry time for the file.
/// </summary>
/// <param name="path">Path of file or directory</param>
/// <param name="eopt">Expiry option</param>
/// <param name="expiryTime">Expiry time</param>
/// <param name="cancelToken">Cancellation token</param>
public override async Task SetExpiryTimeAsync(string path, ExpiryOption eopt, long expiryTime,
CancellationToken cancelToken = default(CancellationToken))
{
await Task.Run(() =>
{
if (!_directoryEntries.ContainsKey(path))
{
throw new AdlsException("The file does not exist.");
}
DateTime? dateTimeToSet;
if (eopt == ExpiryOption.Absolute)
{
dateTimeToSet = DirectoryEntry.GetDateTimeFromServerTime(expiryTime);
}
else if (eopt == ExpiryOption.NeverExpire)
{
dateTimeToSet = null;
}
else if (eopt == ExpiryOption.RelativeToNow)
{
dateTimeToSet = DateTime.UtcNow + new TimeSpan(expiryTime * 10000);
}
else
{
dateTimeToSet = _directoryEntries[path].CreationTime + new TimeSpan(expiryTime * 10000);
}
_directoryEntries[path].Entry.ExpiryTime = dateTimeToSet;
}, cancelToken);
}
/// <summary>
/// Sets the expiry time for the file.
/// </summary>
/// <param name="path">Path of file or directory</param>
/// <param name="eopt">Expiry option</param>
/// <param name="expiryTime">Expiry time</param>
/// <param name="cancelToken">Cancellation token</param>
public override void SetExpiryTime(string path, ExpiryOption eopt, long expiryTime,
CancellationToken cancelToken = default(CancellationToken))
{
SetExpiryTimeAsync(path, eopt, expiryTime, cancelToken).GetAwaiter().GetResult();
}
/// <summary>
/// Sets the permission string for the given path
/// </summary>
/// <param name="path">Path of file or directory</param>
/// <param name="permission">Permission string</param>
/// <param name="cancelToken">Cancellation token</param>
public override async Task SetPermissionAsync(string path, string permission,
CancellationToken cancelToken = default(CancellationToken))
{
await Task.Run(() =>
{
if (!_directoryEntries.ContainsKey(path))
{
throw new AdlsException("The file does not exist.");
}
_directoryEntries[path].Entry.Permission = _directoryEntries[path].AclData.Permission = permission;
}, cancelToken);
}
/// <summary>
/// Sets the permission string for the given path
/// </summary>
/// <param name="path">Path of file or directory</param>
/// <param name="permission">Permission string</param>
/// <param name="cancelToken">Cancellation token</param>
public override void SetPermission(string path, string permission,
CancellationToken cancelToken = default(CancellationToken))
{
SetPermissionAsync(path, permission, cancelToken).GetAwaiter().GetResult();
}
/// <summary>
/// Adds acl entries for a given path
/// </summary>
/// <param name="path">Path of file or directory</param>
/// <param name="aclSpec">Acl list to append</param>
/// <param name="cancelToken">Cancellation token</param>
public override async Task ModifyAclEntriesAsync(string path, List<AclEntry> aclSpec,
CancellationToken cancelToken = default(CancellationToken))
{
await Task.Run(() =>
{
// Very naive, can be made intelligent by checking whether the acls are already present
_directoryEntries[path].AclData.Entries.AddRange(aclSpec);
_directoryEntries[path].Entry.HasAcl = true;
}, cancelToken);
}
/// <summary>
/// Adds acl entries for a given path
/// </summary>
/// <param name="path">Path of file or directory</param>
/// <param name="aclSpec">Acl list to append</param>
/// <param name="cancelToken">Cancellation token</param>
public override void ModifyAclEntries(string path, List<AclEntry> aclSpec,
CancellationToken cancelToken = default(CancellationToken))
{
ModifyAclEntriesAsync(path, aclSpec, cancelToken).GetAwaiter().GetResult();
}
/// <summary>
/// Sets new acl entries for the given path.
/// </summary>
/// <param name="path">Path of file or directory</param>
/// <param name="aclSpec">Acl list to set</param>
/// <param name="cancelToken">Cancellation token</param>
public override async Task SetAclAsync(string path, List<AclEntry> aclSpec,
CancellationToken cancelToken = default(CancellationToken))
{
await Task.Run(() =>
{
var newAclList = new List<AclEntry>();
foreach (var aclEntry in aclSpec)
{
if (((aclEntry.Type == AclType.user || aclEntry.Type == AclType.group) &&
string.IsNullOrEmpty(aclEntry.UserOrGroupId)) || aclEntry.Type == AclType.other)
{
continue;
}
newAclList.Add(aclEntry);
}
_directoryEntries[path].AclData.Entries = newAclList;
_directoryEntries[path].Entry.HasAcl = true;
}, cancelToken);
}
/// <summary>
/// Sets new acl entries for the given path.
/// </summary>
/// <param name="path">Path of file or directory</param>
/// <param name="aclSpec">Acl list to set</param>
/// <param name="cancelToken">Cancellation token</param>
public override void SetAcl(string path, List<AclEntry> aclSpec,
CancellationToken cancelToken = default(CancellationToken))
{
SetAclAsync(path, aclSpec, cancelToken).GetAwaiter().GetResult();
}
/// <summary>
/// Sets the owner and group of the path
/// </summary>
/// <param name="path">Path of file or directory</param>
/// <param name="owner">Owner guid</param>
/// <param name="group">Group guid</param>
/// <param name="cancelToken">Cancellation token</param>
public override async Task SetOwnerAsync(string path, string owner, string group,
CancellationToken cancelToken = default(CancellationToken))
{
await Task.Run(() =>
{
var metaData = _directoryEntries[path];
if (!string.IsNullOrEmpty(owner))
{
metaData.Entry.User = metaData.AclData.Owner = owner;
}
if (!string.IsNullOrEmpty(group))
{
metaData.Entry.Group = metaData.AclData.Group = group;
}
}, cancelToken);
}
/// <summary>
/// Sets the owner and group of the path
/// </summary>
/// <param name="path">Path of file or directory</param>
/// <param name="owner">Owner guid</param>
/// <param name="group">Group guid</param>
/// <param name="cancelToken">Cancellation token</param>
public override void SetOwner(string path, string owner, string group,
CancellationToken cancelToken = default(CancellationToken))
{
SetOwnerAsync(path, owner, group, cancelToken).GetAwaiter().GetResult();
}
/// <summary>
/// Removes specified Acl Entries for a file or directory from the internal AclStatus maintained in memory.
/// </summary>
/// <param name="path">Path of the file or directory</param>
/// <param name="aclSpec">List of Acl Entries to remove</param>
/// <param name="cancelToken">CancellationToken to cancel the request</param>
public override async Task RemoveAclEntriesAsync(string path, List<AclEntry> aclSpec, CancellationToken cancelToken = default(CancellationToken))
{
await Task.Run(() =>
{
var toRemove = new List<AclEntry>();
foreach (var aclEntryToRemove in aclSpec)
{
foreach (var entry in _directoryEntries[path].AclData.Entries)
{
if (entry.Type.Equals(aclEntryToRemove.Type) &&
entry.UserOrGroupId.Equals(aclEntryToRemove.UserOrGroupId) &&
entry.Scope.Equals(aclEntryToRemove.Scope))
{
toRemove.Add(entry);
}
}
}
foreach (var entry in toRemove)
{
_directoryEntries[path].AclData.Entries.Remove(entry);
}
}, cancelToken);
}
/// <summary>
/// Removes specified Acl Entries for a file or directory from the internal AclStatus maintained in memory.
/// </summary>
/// <param name="path">Path of the file or directory</param>
/// <param name="aclSpec">List of Acl Entries to remove</param>
/// <param name="cancelToken">CancellationToken to cancel the request</param>
public override void RemoveAclEntries(string path, List<AclEntry> aclSpec,
CancellationToken cancelToken = default(CancellationToken))
{
RemoveAclEntriesAsync(path, aclSpec, cancelToken).GetAwaiter().GetResult();
}
/// <summary>
/// Removes all Acl Entries for a file or directory from the internal AclStatus maintained in memory.
/// </summary>
/// <param name="path">Path of the file or directory</param>
/// <param name="cancelToken">CancellationToken to cancel the request</param>
public override async Task RemoveAllAclsAsync(string path, CancellationToken cancelToken = default(CancellationToken))
{
await Task.Run(() =>
{
_directoryEntries[path].AclData.Entries = new List<AclEntry>();
_directoryEntries[path].Entry.HasAcl = false;
}, cancelToken);
}
/// <summary>
/// Removes all Acl Entries for a file or directory from the internal AclStatus maintained in memory.
/// </summary>
/// <param name="path">Path of the file or directory</param>
/// <param name="cancelToken">CancellationToken to cancel the request</param>
public override void RemoveAllAcls(string path, CancellationToken cancelToken = default(CancellationToken))
{
RemoveAllAclsAsync(path, cancelToken).GetAwaiter().GetResult();
}
/// <summary>
/// Removes all Acl Entries of AclScope default for a file or directory from the internal AclStatus maintained in memory.
/// </summary>
/// <param name="path">Path of the file or directory</param>
/// <param name="cancelToken">CancellationToken to cancel the request</param>
public override async Task RemoveDefaultAclsAsync(string path, CancellationToken cancelToken = default(CancellationToken))
{
await Task.Run(() =>
{
var defaultAclList = new List<AclEntry>();
foreach (var aclEntry in _directoryEntries[path].AclData.Entries)
{
if (aclEntry.Scope == AclScope.Default)
{
defaultAclList.Add(aclEntry);
}
}
foreach (var aclEntry in defaultAclList)
{
_directoryEntries[path].AclData.Entries.Remove(aclEntry);
}
}, cancelToken);
}
/// <summary>
/// Removes all Acl Entries of AclScope default for a file or directory from the internal AclStatus maintained in memory.
/// </summary>
/// <param name="path">Path of the file or directory</param>
/// <param name="cancelToken">CancellationToken to cancel the request</param>
public override void RemoveDefaultAcls(string path, CancellationToken cancelToken = default(CancellationToken))
{
RemoveDefaultAclsAsync(path, cancelToken).GetAwaiter().GetResult();
}
/// <summary>
/// Gets the ACL entry list, owner ID, group ID, octal permission and sticky bit (only for a directory) of the file/directory
/// </summary>
/// <param name="path">Path of the file or directory</param>
/// <param name="userIdFormat">way to represent the user/group object</param>
/// <param name="cancelToken">CancellationToken to cancel the request</param>
public override async Task<AclStatus> GetAclStatusAsync(string path, UserGroupRepresentation userIdFormat = UserGroupRepresentation.ObjectID, CancellationToken cancelToken = default(CancellationToken))
{
return await Task.Run(() => new AclStatus(_directoryEntries[path].AclData), cancelToken);
}
/// <summary>
/// Gets the ACL entry list, owner ID, group ID, octal permission and sticky bit (only for a directory) of the file/directory
/// </summary>
/// <param name="path">Path of the file or directory</param>
/// <param name="userIdFormat">way to represent the user/group object</param>
/// <param name="cancelToken">CancellationToken to cancel the request</param>
public override AclStatus GetAclStatus(string path, UserGroupRepresentation userIdFormat = UserGroupRepresentation.ObjectID, CancellationToken cancelToken = default(CancellationToken))
{
return GetAclStatusAsync(path, userIdFormat, cancelToken).GetAwaiter().GetResult();
}
/// <summary>
/// Bulk uploads file only. Reads a local file and maintains the memory stream for the entry
/// </summary>
/// <param name="srcPath">Local source path</param>
/// <param name="destPath">Remote destination path - It should always be a directory.</param>
/// <param name="numThreads">Not used</param>
/// <param name="shouldOverwrite">Whether to overwrite or skip if the destination exists</param>
/// <param name="progressTracker">Not used</param>
/// <param name="notRecurse">Not used</param>
/// <param name="resume">Not used</param>
/// <param name="isBinary">Not used</param>
/// <returns>Transfer Status encapsulating the details of upload</returns>
public override TransferStatus BulkUpload(string srcPath, string destPath, int numThreads = -1, IfExists shouldOverwrite = IfExists.Overwrite, IProgress<TransferStatus> progressTracker = null, bool notRecurse = false, bool resume = false, bool isBinary = false, CancellationToken cancelToken=default(CancellationToken))
{
// Currently this is for a single file
if (!File.Exists(srcPath))
{
throw new ArgumentException("Currently not supported for folder");
}
using (Stream localStream = new FileStream(srcPath, FileMode.Open, FileAccess.Read),
adlsStream=CreateFile(destPath,shouldOverwrite))
{
byte[] buff=new byte[AdlsOutputStream.BufferMaxCapacity];
while (true)
{
int bytesRead = localStream.Read(buff, 0, buff.Length);
if (bytesRead > 0)
{
adlsStream.Write(buff, 0, bytesRead);
}
else
{
break;
}
}
}
return new TransferStatus();
}
/// <summary>
/// Reads data from memory stream and save it to local file
/// </summary>
/// <param name="srcPath">Remote source path</param>
/// <param name="destPath">Local destination path. It should always be a directory.</param>
/// <param name="numThreads">Not used</param>
/// <param name="shouldOverwrite">Whether to overwrite or skip if the destination exists</param>
/// <param name="progressTracker">Not used</param>
/// <param name="notRecurse">Not used</param>
/// <param name="resume">Not used</param>
/// <returns>Transfer status encapsulating the details of download</returns>
public override TransferStatus BulkDownload(string srcPath, string destPath, int numThreads = -1, IfExists shouldOverwrite = IfExists.Overwrite, IProgress<TransferStatus> progressTracker = null, bool notRecurse = false, bool resume = false, CancellationToken cancelToken=default(CancellationToken))
{
var entry = _directoryEntries[srcPath].Entry;
if (entry.Type != DirectoryEntryType.FILE)
{
throw new ArgumentException("Currently not supported for folder");
}
using (Stream localStream = new FileStream(destPath, FileMode.Create, FileAccess.ReadWrite),
adlsStream = GetReadStream(srcPath))
{
byte[] buff = new byte[AdlsOutputStream.BufferMaxCapacity];
while (true)
{
int bytesRead = adlsStream.Read(buff, 0, buff.Length);
if (bytesRead > 0)
{
localStream.Write(buff, 0, bytesRead);
}
else
{
break;
}
}
}
return new TransferStatus();
}
/// <summary>
/// Currently the recursive entities need to be created separately for mock testing
/// </summary>
/// <param name="path"></param>
/// <param name="aclEntries"></param>
/// <param name="type"></param>
/// <param name="threadCount"></param>
/// <param name="statusTracker"></param>
/// <param name="cancelToken"></param>
/// <returns></returns>
public override AclProcessorStats ChangeAcl(string path, List<AclEntry> aclEntries, RequestedAclType type, int threadCount, IProgress<AclProcessorStats> statusTracker, CancellationToken cancelToken)
{
return ChangeAcl(path, aclEntries, type, threadCount);
}
/// <summary>
/// Currently the recursive entities need to be created separately for mock testing
/// </summary>
/// <param name="path"></param>
/// <param name="aclEntries"></param>
/// <param name="type"></param>
/// <param name="threadCount"></param>
/// <returns></returns>
public override AclProcessorStats ChangeAcl(string path, List<AclEntry> aclEntries, RequestedAclType type, int threadCount = -1)
{
int numDirs = 0, numFiles = 0;
ConcurrentBag<string> sharePath = new ConcurrentBag<string>();
foreach (var directoryEntriesKey in _directoryEntries.Keys)
{
if (directoryEntriesKey.StartsWith(path))
{
switch (type)
{
case RequestedAclType.ModifyAcl: ModifyAclEntries(_directoryEntries[directoryEntriesKey].Entry.FullName,aclEntries);
break;
case RequestedAclType.RemoveAcl:
RemoveAclEntries(_directoryEntries[directoryEntriesKey].Entry.FullName, aclEntries);
break;
case RequestedAclType.SetAcl:
SetAcl(_directoryEntries[directoryEntriesKey].Entry.FullName, aclEntries);
break;
}
var dirEntry = _directoryEntries[directoryEntriesKey].Entry;
if (dirEntry.Type == DirectoryEntryType.DIRECTORY)
{
if (dirEntry.Attribute != null && dirEntry.Attribute.Contains(DirectoryEntryAttributeType.Link))
{
sharePath.Add(dirEntry.FullName);
}
numDirs++;
}
else
{
numFiles++;
}
}
}
return new AclProcessorStats(numFiles,numDirs, 0, 0, sharePath);
}
/// <summary>
/// Currently the recursive entities need to be created separately for mock testing
/// </summary>
/// <param name="path"></param>
/// <param name="numThreads"></param>
/// <param name="cancelToken"></param>
/// <returns></returns>
public override ContentSummary GetContentSummary(string path, int numThreads = -1,
CancellationToken cancelToken = default(CancellationToken))
{
int numDirs = 0, numFiles = 0;
long numSize = 0;
foreach (var directoryEntriesKey in _directoryEntries.Keys)
{
if (directoryEntriesKey.StartsWith(path) && !directoryEntriesKey.Equals(path))
{
if (_directoryEntries[directoryEntriesKey].Entry.Type == DirectoryEntryType.DIRECTORY)
{
numDirs++;
}
else
{
numFiles++;
numSize += _directoryEntries[directoryEntriesKey].Entry.Length;
}
}
}
return new ContentSummary(numDirs, numFiles, numSize, numSize);
}
/// <summary>
/// Gets fileproperties, conmsistentacl is always true since this is mock
/// </summary>
/// <param name="path"></param>
/// <param name="getAclUsage"></param>
/// <param name="dumpFileName"></param>
/// <param name="getDiskUsage"></param>
/// <param name="saveToLocal"></param>
/// <param name="numThreads"></param>
/// <param name="displayFiles"></param>
/// <param name="hideConsistentAcl"></param>
/// <param name="maxDepth"></param>
public override void GetFileProperties(string path, bool getAclUsage, string dumpFileName, bool getDiskUsage = true,
bool saveToLocal = true, int numThreads = -1, bool displayFiles = false, bool hideConsistentAcl = false,
long maxDepth = Int64.MaxValue, CancellationToken cancelToken = default(CancellationToken))
{
if (!(getAclUsage || getDiskUsage))
{
throw new ArgumentException("At least one option of getAclUsage and getDiskUsage need to be set as true.");
}
if (!displayFiles && hideConsistentAcl)
{
throw new ArgumentException("hideConsistentAcl cannot be true when displayFiles is false because consistent Acl cannot be determined unless we retrieve acls for the files also.");
}
if (saveToLocal)
{
Utils.CreateParentDirectory(dumpFileName);
}
using (var propertyDumpWriter = new StreamWriter((saveToLocal
? new FileStream(dumpFileName, FileMode.Create, FileAccess.ReadWrite)
: (Stream)CreateFile(dumpFileName, IfExists.Overwrite))))
{
WriteHeader(propertyDumpWriter, getDiskUsage, getAclUsage, displayFiles, hideConsistentAcl);
foreach (var directoryEntriesKey in _directoryEntries.Keys)
{
if (directoryEntriesKey.StartsWith(path))
{
var summary = _directoryEntries[directoryEntriesKey].Entry.Type == DirectoryEntryType.DIRECTORY
? GetContentSummary(_directoryEntries[directoryEntriesKey].Entry.FullName)
: new ContentSummary(0, 1, _directoryEntries[directoryEntriesKey].Entry.Length,
_directoryEntries[directoryEntriesKey].Entry.Length);
var status = GetAclStatus(_directoryEntries[directoryEntriesKey].Entry.FullName);
string output = "";
if (getDiskUsage)
{
output =
$"1{FileProperties.PropertyManager.OuputLineSeparator}1{FileProperties.PropertyManager.OuputLineSeparator}1{FileProperties.PropertyManager.OuputLineSeparator}{summary.Length}{FileProperties.PropertyManager.OuputLineSeparator}{summary.FileCount}{FileProperties.PropertyManager.OuputLineSeparator}{summary.DirectoryCount}";
}
if (getAclUsage)
{
bool showAclConsistentColumn = displayFiles || hideConsistentAcl;
output +=
$"{(string.IsNullOrEmpty(output) ? "" : $"{FileProperties.PropertyManager.OuputLineSeparator}")}{string.Join("|", status.Entries)}{(showAclConsistentColumn ? $"{FileProperties.PropertyManager.OuputLineSeparator}true" : "")}";
}
propertyDumpWriter.WriteLine($"{_directoryEntries[directoryEntriesKey].Entry.FullName}{FileProperties.PropertyManager.OuputLineSeparator}{_directoryEntries[directoryEntriesKey].Entry.Type}{FileProperties.PropertyManager.OuputLineSeparator}{output}");
}
}
}
}
private void WriteHeader(StreamWriter propertyDumpWriter, bool getSizeProperty, bool getAclProperty, bool displayFiles, bool hideConsistentAclTree)
{
string output = "";
if (getSizeProperty)
{
output =
$"Total size of direct child files and directories{FileProperties.PropertyManager.OuputLineSeparator}Total number of direct files{FileProperties.PropertyManager.OuputLineSeparator}Total number of direct directories{FileProperties.PropertyManager.OuputLineSeparator}Total size{FileProperties.PropertyManager.OuputLineSeparator}Total number of files{FileProperties.PropertyManager.OuputLineSeparator}Total number of directories";
}
if (getAclProperty)
{
// If DisplayFiles is false that means HideConsistentAcl has to be false (there is a check at entry point in client)/
// And if DisplayFiles is false consistentAcl information is not correct since acl of files is not known
bool showAclConsistentColumn = displayFiles || hideConsistentAclTree;
output +=
$"{(string.IsNullOrEmpty(output) ? "" : $"{FileProperties.PropertyManager.OuputLineSeparator}")}Acl Entries{(showAclConsistentColumn ? FileProperties.PropertyManager.OuputLineSeparator + "Whether Acl is same for all descendants" : "")}";
}
propertyDumpWriter.WriteLine($"Entry name{FileProperties.PropertyManager.OuputLineSeparator}Entry Type{FileProperties.PropertyManager.OuputLineSeparator}{output}");
}
}
}