code/KustoCopyConsole/Storage/AzureStorage/AzureBlobFileSystem.cs (71 lines of code) (raw):

using Azure; using Azure.Core; using Azure.Storage.Blobs.Specialized; using Azure.Storage.Files.DataLake; using Azure.Storage.Files.DataLake.Models; using Polly; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace KustoCopyConsole.Storage.AzureStorage { internal class AzureBlobFileSystem : IFileSystem { private static readonly AsyncPolicy _writeBlockRetryPolicy = Policy.Handle<RequestFailedException>() .RetryAsync(3); private readonly DataLakeDirectoryClient _rootDirectory; private readonly TokenCredential _tokenCredential; /// <summary>Construct an Azure Blob based <see cref="IFileSystem"/>.</summary> /// <param name="dataLakeRootUrl">Root directory of the file system.</param> public AzureBlobFileSystem(string dataLakeRootUrl, TokenCredential tokenCredential) { _rootDirectory = new DataLakeDirectoryClient( new Uri(dataLakeRootUrl), tokenCredential); _tokenCredential = tokenCredential; } async Task<Stream?> IFileSystem.OpenReadAsync(string path, CancellationToken ct) { var fileClient = _rootDirectory.GetFileClient(path); if (await fileClient.ExistsAsync(ct)) { return await fileClient.OpenReadAsync(new DataLakeOpenReadOptions(false), ct); } else { return null; } } async Task<IAppendStorage> IFileSystem.OpenWriteAsync(string path, CancellationToken ct) { var fileClient = _rootDirectory.GetFileClient(path); if (await fileClient.ExistsAsync(ct)) { throw new InvalidOperationException($"Blob '{fileClient.Uri}' already exists"); } else { var blobClient = new AppendBlobClient(fileClient.Uri, _tokenCredential); var storage = new AzureBlobAppendStorage( blobClient, _writeBlockRetryPolicy); return storage; } } async Task IFileSystem.MoveAsync(string source, string destination, CancellationToken ct) { var sourceFile = _rootDirectory.GetFileClient(source); var destinationFile = _rootDirectory.GetFileClient(destination); await _writeBlockRetryPolicy.ExecuteAsync(async () => { await sourceFile.RenameAsync(destinationFile.Path); }); } async Task IFileSystem.RemoveFolderAsync(string path, CancellationToken ct) { var subDirectory = _rootDirectory.GetSubDirectoryClient(path); await subDirectory.DeleteIfExistsAsync(); } } }