code/KustoCopyConsole/Storage/AzureStorage/BlobLock.cs (63 lines of code) (raw):

using Azure; using Azure.Storage.Blobs.Specialized; using System.Threading; namespace KustoCopyConsole.Storage.AzureStorage { internal class BlobLock : IAsyncDisposable { #if DEBUG private static readonly TimeSpan DEFAULT_LEASE_DURATION = TimeSpan.FromSeconds(20); private static readonly TimeSpan DEFAULT_LEASE_RENEWAL_PERIOD = TimeSpan.FromSeconds(10); #else private static readonly TimeSpan DEFAULT_LEASE_DURATION = TimeSpan.FromSeconds(60); private static readonly TimeSpan DEFAULT_LEASE_RENEWAL_PERIOD = TimeSpan.FromSeconds(40); #endif private readonly Task _backgroundTask; private readonly TaskCompletionSource _backgroundCompletedSource = new(); #region Constructors private BlobLock(BlobLeaseClient leaseClient, CancellationToken ct) { LeaseClient = leaseClient; _backgroundTask = Task.Run(() => BackGroundRenewLockAsync(ct)); } internal static async Task<BlobLock> CreateAsync( BlobBaseClient blobClient, CancellationToken ct) { var leaseClient = blobClient.GetBlobLeaseClient(); try { await leaseClient.AcquireAsync(DEFAULT_LEASE_DURATION); return new BlobLock(leaseClient, ct); } catch (RequestFailedException ex) { if (ex.ErrorCode == "LeaseAlreadyPresent") { throw new CopyException("Lease on blob already present", false); } else { throw; } } } #endregion public BlobLeaseClient LeaseClient { get; } async ValueTask IAsyncDisposable.DisposeAsync() { _backgroundCompletedSource.SetResult(); await _backgroundTask; } private async Task BackGroundRenewLockAsync(CancellationToken ct) { while (!_backgroundCompletedSource.Task.IsCompleted) { await Task.WhenAny( Task.Delay(DEFAULT_LEASE_RENEWAL_PERIOD, ct), _backgroundCompletedSource.Task); await LeaseClient.RenewAsync(null, ct); } } } }