dotnet/src/Azure.Iot.Operations.Services/LeasedLock/LeasedLockClient.cs (357 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. using Azure.Iot.Operations.Services.StateStore; using Azure.Iot.Operations.Protocol; using System.Diagnostics; using Azure.Iot.Operations.Protocol.Retry; namespace Azure.Iot.Operations.Services.LeasedLock { ///<summary> /// A client to facilitate leased lock operations on a specific, provided lock name. ///</summary> ///<remarks> /// <para> /// Once acquired, a lock will not be automatically renewed by default. This client allows you to opt-in to auto-renew /// with <see cref="AutomaticRenewalOptions"/>, though. /// </para> /// <para> /// When a lock is granted via <see cref="AcquireLockAsync(TimeSpan, AcquireLockRequestOptions?, CancellationToken)"/>, /// the service will respond with a fencing token via <see cref="AcquireLockResponse.FencingToken"/>. This fencing token /// allows for State Store set/delete operations on shared resources without risk of race conditions. /// </para> /// </remarks> public class LeasedLockClient : IAsyncDisposable { private readonly IStateStoreClient _stateStoreClient; private readonly string _lockKey; private const string ValueFormat = "{0}:{1}"; private const int _retryResetIntervalCoefficient = 4; private readonly TimeSpan _retryPolicyMaxWait = TimeSpan.FromMilliseconds(200); private const uint _retryPolicyBaseExponent = 1; private const uint _retryPolicyMaxRetries = 5; private System.Timers.Timer? _automaticRenewalTimer; private CancellationTokenSource? _renewalTimerCancellationToken; private LeasedLockAutomaticRenewalOptions _automaticRenewalOptions = new LeasedLockAutomaticRenewalOptions() { AutomaticRenewal = false }; private bool _disposed = false; private TaskCompletionSource? _lockFreeToAcquireTaskCompletionSource; private bool _isObservingLock = false; private readonly IRetryPolicy _retryPolicy; private readonly TimeSpan _maximumDefaultLeaseDuration = TimeSpan.FromSeconds(5); /// <summary> /// The callback that executes whenever the current holder of the lock changes. /// </summary> /// <remarks> /// Users who want to watch lock holder change events must first set this callback, then /// call <see cref="ObserveLockAsync(ObserveLockRequestOptions?, CancellationToken)"/> /// To stop watching lock holder change events, call <see cref="UnobserveLockAsync(CancellationToken)"/> /// and then remove any handlers from this object. /// </remarks> public event Func<object?, LockChangeEventArgs, Task>? LockChangeEventReceivedAsync; /// <summary> /// The name this client uses when trying to acquire the leased lock. /// </summary> public string LockHolderName { get; private set; } /// <summary> /// The options for automatically re-acquiring a lock before the previous lease expires. By default, /// no automatic re-acquiring happens. /// </summary> /// <remarks> /// <para> /// These options must be set before calling <see cref="AcquireLockAsync(TimeSpan, AcquireLockRequestOptions?, CancellationToken)"/>. /// Once set, the automatic renewal will begin after the first call to <see cref="AcquireLockAsync(TimeSpan, AcquireLockRequestOptions?, CancellationToken)"/>. /// </para> /// <para> /// Automatic renewal will continue for as long as the lock can be re-acquired. If another party acquires the lock, then this party's auto-renewal /// will end. In this case, users should use <see cref="AcquireLockAsync(TimeSpan, AcquireLockRequestOptions?, CancellationToken)"/> to acquire the lock /// instead to avoid polling. /// </para> /// <para> /// The result of automatic renewals can be accessed via <see cref="MostRecentAcquireLockResponse"/>. /// </para> /// </remarks> public LeasedLockAutomaticRenewalOptions AutomaticRenewalOptions { get { ObjectDisposedException.ThrowIf(_disposed, this); return _automaticRenewalOptions; } set { ObjectDisposedException.ThrowIf(_disposed, this); ArgumentNullException.ThrowIfNull(value, nameof(value)); _automaticRenewalOptions = value; if (!_automaticRenewalOptions.AutomaticRenewal) { // stop any subsequent automatic renewals CancelAutomaticRenewal(); } } } /// <summary> /// The result of the most recent attempt at acquiring the lock. /// </summary> /// <remarks> /// This value captures the result of automatic re-renewing of the lock with <see cref="AutomaticRenewalOptions"/>. /// </remarks> public AcquireLockResponse? MostRecentAcquireLockResponse { get; private set; } /// <summary> /// Construct a new leased lock client. /// </summary> /// <param name="applicationContext">The application context containing shared resources.</param> /// <param name="mqttClient">The client to use for I/O operations.</param> /// <param name="lockName">The name of the lock to acquire/release.</param> /// <param name="retryPolicy">The policy used to add extra wait time after leas available to acquire.</param> /// <param name="lockHolderName">The name for this client that will hold a lock. Other processes /// will be able to check which client holds a lock by name. By default, this is set to the MQTT client ID. /// </param> public LeasedLockClient(ApplicationContext applicationContext, IMqttPubSubClient mqttClient, string lockName, string? lockHolderName = null, IRetryPolicy? retryPolicy = null) { if (string.IsNullOrEmpty(lockName)) { throw new ArgumentException("Must provide a non-null, non-empty lock name"); } _stateStoreClient = new StateStoreClient(applicationContext, mqttClient); _lockKey = lockName; _retryPolicy = retryPolicy ?? new ExponentialBackoffRetryPolicy( _retryPolicyMaxRetries, _retryPolicyBaseExponent, _retryPolicyMaxWait); LockHolderName = lockHolderName ?? mqttClient.ClientId ?? throw new ArgumentNullException(nameof(mqttClient.ClientId), "Must provide either a non-null MQTT client Id or a non-null lock holder name"); _automaticRenewalOptions = new LeasedLockAutomaticRenewalOptions(); _stateStoreClient.KeyChangeMessageReceivedAsync += OnKeyChangeNotification; } public LeasedLockClient(IStateStoreClient stateStoreClient, string lockName, string lockHolderName, IRetryPolicy? retryPolicy = null) { if (string.IsNullOrEmpty(lockName)) { throw new ArgumentException("Must provide a non-null, non-empty lock name"); } _stateStoreClient = stateStoreClient; _lockKey = lockName; _retryPolicy = retryPolicy ??new ExponentialBackoffRetryPolicy( _retryPolicyMaxRetries, _retryPolicyBaseExponent, _retryPolicyMaxWait); LockHolderName = lockHolderName; _automaticRenewalOptions = new LeasedLockAutomaticRenewalOptions(); _stateStoreClient.KeyChangeMessageReceivedAsync += OnKeyChangeNotification; } internal LeasedLockClient() { _retryPolicy = new ExponentialBackoffRetryPolicy( _retryPolicyMaxRetries, _retryPolicyBaseExponent, _retryPolicyMaxWait); _stateStoreClient = new StateStoreClient(); _lockKey = string.Empty; LockHolderName = string.Empty; } /// <summary> /// Attempt to acquire a lock with the provided name. /// </summary> /// <param name="leaseDuration">The duration for which the lock will be held. This value only has millisecond-level precision.</param> /// <param name="options">The lock request options.</param> /// <param name="cancellationToken">Cancellation token.</param> /// <returns>AcquireLockResponse object with result (and fencing token if the lock was successfully acquired.)</returns> /// <remarks> /// <para> /// Once acquired, a lock will not be automatically renewed by default. This client allows you to opt-in to auto-renew /// with <see cref="AutomaticRenewalOptions"/>, though. /// </para> /// <para> /// When acquired, a lock has a value assigned to it which follows either the format /// {lockHolderName}:{sessionId} if a sessionId is provided by <paramref name="options"/> or /// just {lockHolderName} if no sessionId is provided. The lock holder name is chosen /// when constructing this client and a sessionId can be chosen (or omitted, by default) /// each attempt to acquire a lock. /// </para> /// </remarks> public virtual async Task<AcquireLockResponse> TryAcquireLockAsync(TimeSpan leaseDuration, AcquireLockRequestOptions? options = null, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); ObjectDisposedException.ThrowIf(_disposed, this); AcquireLockResponse acquireLockResponse = await TryAcquireLockWithoutEnablingAutoRenewalAsync(leaseDuration, options, cancellationToken); if (acquireLockResponse.Success && AutomaticRenewalOptions != null && AutomaticRenewalOptions.AutomaticRenewal) { EnableAutomaticRenewal(); } return acquireLockResponse; } private async Task<AcquireLockResponse> TryAcquireLockWithoutEnablingAutoRenewalAsync(TimeSpan leaseDuration, AcquireLockRequestOptions? options = null, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); options ??= new AcquireLockRequestOptions(); StateStoreValue value = string.IsNullOrEmpty(options.SessionId) ? new StateStoreValue(LockHolderName) : new StateStoreValue(string.Format(ValueFormat, LockHolderName, options.SessionId)); Debug.Assert(_lockKey != null); StateStoreSetResponse setResponse = await _stateStoreClient.SetAsync( _lockKey, value, new StateStoreSetRequestOptions() { Condition = SetCondition.OnlyIfEqualOrNotSet, ExpiryTime = leaseDuration, }, cancellationToken: cancellationToken).ConfigureAwait(false); MostRecentAcquireLockResponse = new AcquireLockResponse( setResponse.Version, setResponse.Success); return MostRecentAcquireLockResponse; } private void EnableAutomaticRenewal(AcquireLockRequestOptions? options = null) { // Cancel any previous auto-renewal timer + cancellation token CancelAutomaticRenewal(); _automaticRenewalTimer = new System.Timers.Timer(); _renewalTimerCancellationToken = new CancellationTokenSource(); _automaticRenewalTimer.Interval = AutomaticRenewalOptions.RenewalPeriod.TotalMilliseconds; _automaticRenewalTimer.Elapsed += async (sender, args) => { try { MostRecentAcquireLockResponse = await TryAcquireLockWithoutEnablingAutoRenewalAsync( AutomaticRenewalOptions.LeaseTermLength, options, _renewalTimerCancellationToken.Token); if (!MostRecentAcquireLockResponse.Success) { CancelAutomaticRenewal(); } } catch (OperationCanceledException) { // The automatic renewal was cancelled, so ignore this error. } catch (ObjectDisposedException) { // The client was disposed while attempting automatic renewal. Ignore // this exception because the automatic renewal should not continue. is now moot. } catch (AkriMqttException) { // May be thrown if the client is disposed mid-request. Safe to ignore because // the client doesn't need the response anymore. } catch (Exception) { // This default case covers for any unexpectedly thrown exceptions. Since users can dependency inject // their own MQTT client into this library, we have no way of knowing what exceptions could bubble up. // Like the other catch cases, though, nothing needs to be done here. If a transient error occurred, // then the next time the timer wakes up a renewal request will be re-attempted. If a non-transient error // occurred or if the client is done automatically renewing, then it is irrelevant if this attempt succeed // or failed. } }; _automaticRenewalTimer.Start(); } /// <summary> /// Await until this client has acquired the lock or cancellation is requested. /// </summary> /// <param name="leaseDuration">The duration for which the lock will be held if the lock is acquired This value only has millisecond-level precision.</param> /// <param name="options">The lock request options.</param> /// <param name="cancellationToken">The cancellation token.</param> /// <returns>The service response object containing the fencing token if the lock was successfully acquired.</returns> /// <para> /// Once acquired, a lock will not be automatically renewed by default. This client allows you to opt-in to auto-renew /// with <see cref="AutomaticRenewalOptions"/>, though. /// </para> /// <para> /// When acquired, a lock has a value assigned to it which follows the format: {lockHolderName}:{sessionId}. The lock /// holder name is chosen when constructing this client and a sessionId can be chosen each attempt to acquire a lock. /// </para> /// <para> /// This function does not rely on continuous, active polling. Instead, it relies on receiving /// notifications about when it is possible to acquire this lock. Since it is possible /// that a lock may never be acquired, it is highly recommended to provided a cancellation token. /// </para> public virtual async Task<AcquireLockResponse> AcquireLockAsync(TimeSpan leaseDuration, AcquireLockRequestOptions? options = null, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); ObjectDisposedException.ThrowIf(_disposed, this); options ??= new AcquireLockRequestOptions(); _lockFreeToAcquireTaskCompletionSource = new TaskCompletionSource(); try { if (!_isObservingLock) { Debug.Assert(_lockKey != null); // The user may already be observing the lock separately from this single attempt to acquire the lock, so don't // observe it if the user is already observing it. await _stateStoreClient.ObserveAsync(_lockKey, cancellationToken: cancellationToken).ConfigureAwait(false); } var retryPolicy = new RetryPolicyWithAutoReset(_retryPolicy, expirationInterval: leaseDuration * _retryResetIntervalCoefficient); AcquireLockResponse response; do { response = await TryAcquireLockAsync(leaseDuration, options, cancellationToken).ConfigureAwait(false); // The initial set call failed to acquire the lock. Now this process will wait to be notified when // the key's state has changed to deleted before attempting to acquire it again. if (!response.Success) { await _lockFreeToAcquireTaskCompletionSource.Task.WaitAsync(cancellationToken).ConfigureAwait(false); _lockFreeToAcquireTaskCompletionSource = new TaskCompletionSource(); // The lock is now available to be acquired, we prioritize previous leader renewal, // wait a bit to make sure old leader have enough time for renewal. bool shouldRetry = retryPolicy.ShouldRetry(null, out TimeSpan retryDelay); if (shouldRetry) { await Task.Delay(retryDelay, cancellationToken).ConfigureAwait(false); } else { throw new RetryExpiredException("Retry policy was exhausted when trying to acquire the lock"); } } } while (!response.Success); retryPolicy.Reset(); return response; } finally { if (!_isObservingLock) { Debug.Assert(_lockKey != null); // The user may be observing the lock separately from this single attempt to acquire the lock, so don't // unobserve it if the user is still observing it. await _stateStoreClient.UnobserveAsync(_lockKey, cancellationToken: cancellationToken).ConfigureAwait(false); } } } /// <summary> /// Block until the lock is acquired, update the value of the state store resource based on /// its current value, then release the lock. /// </summary> /// <param name="key">The state store key whose value will be updated.</param> /// <param name="updateValueFunc"> /// The function to execute after the lock is acquired. The parameter of this function contains /// the current value of the state store key. The return value of this function is the new value /// that you wish the state store key to have. /// </param> /// <param name="maximumLeaseDuration"> /// The maximum length of time that the client will lease the lock for once acquired. Under normal circumstances, /// this function will release the lock after updating the value of the shared resource, but it is possible that /// this client is interrupted or encounters a fatal exception. By setting a low value for this field, you limit /// how long the lock can be acquired for before it is released automatically by the service. /// </param> /// <param name="cancellationToken">Cancellation token.</param> /// <remarks> /// This function will always release the lock if it was acquired. Even if cancellation is requested /// when the lock is acquired, this function will release the lock. /// </remarks> public async Task AcquireLockAndUpdateValueAsync(StateStoreKey key, Func<StateStoreValue?, StateStoreValue?> updateValueFunc, TimeSpan? maximumLeaseDuration = null, CancellationToken cancellationToken = default) { TimeSpan leaseDurationVerified = maximumLeaseDuration ?? _maximumDefaultLeaseDuration; // The lock may need to be acquired multiple times before the key is successfully updated. bool valueChanged = false; while (!valueChanged) { cancellationToken.ThrowIfCancellationRequested(); AcquireLockResponse acquireLockResponse = await AcquireLockAsync(leaseDurationVerified, cancellationToken: cancellationToken); if (!acquireLockResponse.Success) { continue; } try { StateStoreGetResponse getResponse = await _stateStoreClient.GetAsync(key, cancellationToken: cancellationToken); StateStoreValue? newValue = updateValueFunc.Invoke(getResponse.Value); if (newValue == null) { var deleteOptions = new StateStoreDeleteRequestOptions() { FencingToken = acquireLockResponse.FencingToken, }; StateStoreDeleteResponse deleteResponse = await _stateStoreClient.DeleteAsync(key, deleteOptions, cancellationToken: cancellationToken); valueChanged = deleteResponse.DeletedItemsCount == 1; } else { var setOptions = new StateStoreSetRequestOptions() { FencingToken = acquireLockResponse.FencingToken, }; StateStoreSetResponse setResponse = await _stateStoreClient.SetAsync(key, newValue, setOptions, cancellationToken: cancellationToken); valueChanged = setResponse.Success; } } finally { // Cancellation may have been requested while the lock was acquired. Even in that // case, this function still needs to release the lock prior to returning // so never pass along the user-supplied cancellation token. // // Also note that this request may fail if this process no longer owns the lock, // but that case isn't a problem because that is the desired state. Because of that, // there is no need to check the return value here. await ReleaseLockAsync(cancellationToken: CancellationToken.None); } } } /// <summary> /// Get the current holder of the lock. /// </summary> /// <param name="cancellationToken">Cancellation token.</param> /// <returns>The details about the current holder of the lock.</returns> /// <remarks> /// When acquired, a lock has a value assigned to it which follows the format: {lockHolderName}:{sessionId}. /// This function will return this value so that the owner of the lock can be identified by one or both /// of these fields. /// </remarks> public virtual async Task<GetLockHolderResponse> GetLockHolderAsync(CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); ObjectDisposedException.ThrowIf(_disposed, this); Debug.Assert(_lockKey != null); StateStoreGetResponse getResponse = await _stateStoreClient.GetAsync( _lockKey, cancellationToken: cancellationToken).ConfigureAwait(false); if (getResponse.Value == null) { return new GetLockHolderResponse(null); } return new GetLockHolderResponse(new LeasedLockHolder(getResponse.Value.Bytes)); } /// <summary> /// Attempt to release a lock with the provided name. /// </summary> /// <returns>The response to the request.</returns> public virtual async Task<ReleaseLockResponse> ReleaseLockAsync(ReleaseLockRequestOptions? options = null, CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); ObjectDisposedException.ThrowIf(_disposed, this); options ??= new ReleaseLockRequestOptions(); StateStoreValue value = string.IsNullOrEmpty(options.SessionId) ? new StateStoreValue(LockHolderName) : new StateStoreValue(string.Format(ValueFormat, LockHolderName, options.SessionId)); Debug.Assert(_lockKey != null); StateStoreDeleteResponse deleteResponse = await _stateStoreClient.DeleteAsync( _lockKey, new StateStoreDeleteRequestOptions() { OnlyDeleteIfValueEquals = value, }, cancellationToken: cancellationToken).ConfigureAwait(false); if (options.CancelAutomaticRenewal) { CancelAutomaticRenewal(); } return new ReleaseLockResponse(deleteResponse.DeletedItemsCount > 0); } /// <summary> /// Start receiving notifications when the lock holder changes for this leased lock. /// </summary> /// <param name="cancellationToken">Cancellation token.</param> /// <remarks> /// Users who want to watch lock holder change events must first set one or more handlers on /// <see cref="LockChangeEventReceivedAsync"/>, then call this function. /// To stop watching lock holder change events, call <see cref="UnobserveLockAsync(CancellationToken)"/> /// and then remove any handlers from <see cref="LockChangeEventReceivedAsync"/>. /// </remarks> public virtual async Task ObserveLockAsync(CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); ObjectDisposedException.ThrowIf(_disposed, this); Debug.Assert(_lockKey != null); await _stateStoreClient.ObserveAsync( _lockKey, cancellationToken: cancellationToken).ConfigureAwait(false); _isObservingLock = true; } /// <summary> /// Stop receiving notifications when the lock holder changes. /// </summary> /// <param name="cancellationToken">Cancellation token.</param> /// <remarks> /// Users who want to watch lock holder change events must first set one or more handlers on /// <see cref="LockChangeEventReceivedAsync"/>, then call <see cref="ObserveLockAsync(ObserveLockRequestOptions?, CancellationToken)"/>. /// To stop watching lock holder change events, call this function /// and then remove any handlers from <see cref="LockChangeEventReceivedAsync"/>. /// </remarks> public virtual async Task UnobserveLockAsync(CancellationToken cancellationToken = default) { cancellationToken.ThrowIfCancellationRequested(); ObjectDisposedException.ThrowIf(_disposed, this); Debug.Assert(_lockKey != null); await _stateStoreClient.UnobserveAsync(_lockKey, cancellationToken: cancellationToken).ConfigureAwait(false); _isObservingLock = false; } public async ValueTask DisposeAsync() { await DisposeAsyncCore(false).ConfigureAwait(false); GC.SuppressFinalize(this); } public async ValueTask DisposeAsync(bool disposing) { await DisposeAsyncCore(disposing).ConfigureAwait(false); GC.SuppressFinalize(this); } protected virtual async ValueTask DisposeAsyncCore(bool disposing) { if (_disposed) { return; } _stateStoreClient.KeyChangeMessageReceivedAsync -= OnKeyChangeNotification; CancelAutomaticRenewal(); if (disposing) { await _stateStoreClient.DisposeAsync(disposing).ConfigureAwait(false); } _disposed = true; } private Task OnKeyChangeNotification(object? sender, KeyChangeMessageReceivedEventArgs keyChangeEventArgs) { bool isLockAvailable = keyChangeEventArgs.NewState == KeyState.Deleted; var lockChangeArgs = new LockChangeEventArgs(isLockAvailable ? LockState.Released : LockState.Acquired, keyChangeEventArgs.Timestamp); if (keyChangeEventArgs.NewValue != null) { lockChangeArgs.NewLockHolder = new LeasedLockHolder(keyChangeEventArgs.NewValue.Bytes); } LockChangeEventReceivedAsync?.Invoke(sender, lockChangeArgs); // This handler only cares when a particular lock is deleted because that means the lock is available // to be acquired. if (keyChangeEventArgs.ChangedKey.Equals(_lockKey) && keyChangeEventArgs.NewState == KeyState.Deleted && _lockFreeToAcquireTaskCompletionSource != null) { // Wake up the thread that was waiting for the lock to be available to acquired. _lockFreeToAcquireTaskCompletionSource.TrySetResult(); } return Task.CompletedTask; } private void CancelAutomaticRenewal() { try { if (_automaticRenewalTimer != null && _renewalTimerCancellationToken != null) { _renewalTimerCancellationToken.Cancel(); _automaticRenewalTimer.Stop(); _renewalTimerCancellationToken.Dispose(); _automaticRenewalTimer.Dispose(); } } catch (ObjectDisposedException) { // This object is already disposed, so there is nothing to cancel } } } }