code/KustoCopyConsole/Storage/AzureStorage/AzureBlobUriProvider.cs (151 lines of code) (raw):
using Azure.Core;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
using Azure.Storage.Files.DataLake;
using Azure.Storage.Sas;
using KustoCopyConsole.Concurrency;
using KustoCopyConsole.Entity.RowItems.Keys;
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace KustoCopyConsole.Storage.AzureStorage
{
internal class AzureBlobUriProvider : IStagingBlobUriProvider
{
#region Inner Types
private class DirectoryProvider
{
private readonly DataLakeDirectoryClient _directoryClient;
private readonly BlobContainerClient _containerClient;
private readonly AsyncCache<UserDelegationKey> _keyCache;
public DirectoryProvider(Uri rootFolderUri, TokenCredential credential)
{
_directoryClient = new DataLakeDirectoryClient(rootFolderUri, credential);
_containerClient = new BlobContainerClient(rootFolderUri, credential);
_keyCache = new(FetchUserDelegationKey);
}
public Uri RootDirectoryUri => _directoryClient.Uri;
public async Task<Uri> GetWritableFolderUrisAsync(string subPath, CancellationToken ct)
{
var userDelegationKey = await _keyCache.GetCacheItemAsync(ct);
var sasBuilder = new BlobSasBuilder
{
BlobContainerName = _containerClient.Name,
Resource = "c", // "b" for blob, "c" for container
ExpiresOn = DateTimeOffset.UtcNow.Add(WRITE_TIME_OUT)
};
// Add permissions (e.g., read and write)
sasBuilder.SetPermissions(BlobSasPermissions.Write);
// Generate SAS token
var sasToken = sasBuilder.ToSasQueryParameters(
userDelegationKey,
_containerClient.AccountName).ToString();
var subDirectoryClient = _directoryClient.GetSubDirectoryClient(subPath);
var uriBuilder = new UriBuilder(subDirectoryClient.Uri)
{
Query = sasToken
};
return uriBuilder.Uri;
}
public async Task<Uri> AuthorizeUriAsync(Uri uri, CancellationToken ct)
{
var userDelegationKey = await _keyCache.GetCacheItemAsync(ct);
var sasBuilder = new BlobSasBuilder
{
BlobContainerName = _containerClient.Name,
Resource = "b", // "b" for blob, "c" for container
ExpiresOn = DateTimeOffset.UtcNow.Add(READ_TIME_OUT)
};
// Add permissions (e.g., read and write)
sasBuilder.SetPermissions(BlobSasPermissions.Read);
// Generate SAS token
var sasToken = sasBuilder.ToSasQueryParameters(
userDelegationKey,
_containerClient.AccountName).ToString();
var uriBuilder = new UriBuilder(uri)
{
Query = sasToken
};
return uriBuilder.Uri;
}
public async Task DeleteStagingDirectoryAsync(string subDirectory, CancellationToken ct)
{
var subDirectoryClient = _directoryClient.GetSubDirectoryClient(subDirectory);
await subDirectoryClient.DeleteAsync();
}
private async Task<(TimeSpan, UserDelegationKey)> FetchUserDelegationKey()
{
var refreshPeriod = READ_TIME_OUT;
var tolerance = TimeSpan.FromSeconds(30);
var key = await _containerClient.GetParentBlobServiceClient().GetUserDelegationKeyAsync(
DateTimeOffset.UtcNow,
DateTimeOffset.UtcNow.Add(refreshPeriod));
return (refreshPeriod.Subtract(tolerance), key.Value);
}
}
#endregion
private static readonly TimeSpan READ_TIME_OUT = TimeSpan.FromDays(5);
private static readonly TimeSpan WRITE_TIME_OUT = TimeSpan.FromMinutes(90);
private readonly IImmutableDictionary<string, DirectoryProvider> _providerMap;
public AzureBlobUriProvider(
IEnumerable<Uri> stagingStorageDirectories,
TokenCredential credential)
{
_providerMap = stagingStorageDirectories
.Select(u => new DirectoryProvider(u, credential))
.ToImmutableDictionary(p => p.RootDirectoryUri.ToString(), p => p);
}
async Task<IEnumerable<Uri>> IStagingBlobUriProvider.GetWritableFolderUrisAsync(
BlockKey blockKey,
CancellationToken ct)
{
var subDirectory = GetSubDirectory(blockKey);
var tasks = _providerMap.Values
.Select(c => c.GetWritableFolderUrisAsync(subDirectory, ct))
.ToImmutableArray();
await TaskHelper.WhenAllWithErrors(tasks);
var uris = tasks
.Select(t => t.Result)
.ToImmutableArray();
return uris;
}
async Task<Uri> IStagingBlobUriProvider.AuthorizeUriAsync(Uri uri, CancellationToken ct)
{
var storageRoot = _providerMap.Keys
.Where(k => uri.ToString().StartsWith(k))
.FirstOrDefault();
if (storageRoot != null)
{
var provider = _providerMap[storageRoot];
return await provider.AuthorizeUriAsync(uri, ct);
}
else
{
throw new CopyException($"Uri isn't from staging directories: '{uri}'", false);
}
}
async Task IStagingBlobUriProvider.DeleteStagingDirectoryAsync(
IterationKey iterationKey,
CancellationToken ct)
{
var subDirectory = GetSubDirectory(iterationKey);
var tasks = _providerMap.Values
.Select(c => c.DeleteStagingDirectoryAsync(subDirectory, ct))
.ToImmutableArray();
await TaskHelper.WhenAllWithErrors(tasks);
}
private string GetSubDirectory(IterationKey iterationKey)
{
var subDirectoryPath = $"activities/{iterationKey.ActivityName}/" +
$"iterations/{iterationKey.IterationId:D20}";
return subDirectoryPath;
}
private string GetSubDirectory(BlockKey blockKey)
{
var iterationKey = new IterationKey(blockKey.ActivityName, blockKey.IterationId);
var subDirectoryPath =
$"{GetSubDirectory(iterationKey)}/blocks/{blockKey.BlockId:D20}";
return subDirectoryPath;
}
}
}