in src/DurableTask.AzureStorage/Partitioning/LeaseCollectionBalancer.cs [172:268]
async Task LeaseRenewer()
{
this.settings.Logger.PartitionManagerInfo(
this.accountName,
this.taskHub,
this.workerName,
string.Empty /* partitionId */,
$"Starting background renewal of {this.leaseType} leases with interval: {this.options.RenewInterval}.");
while (this.isStarted == 1 || !this.shutdownComplete)
{
try
{
var nonRenewedLeases = new ConcurrentBag<T>();
var renewTasks = new List<Task>();
// Renew leases for all currently owned partitions in parallel
foreach (T lease in this.currentlyOwnedShards.Values)
{
if (this.shouldRenewLeaseDelegate(lease.PartitionId))
{
renewTasks.Add(this.RenewLeaseAsync(lease).ContinueWith(renewResult =>
{
if (!renewResult.Result)
{
// Keep track of all failed attempts to renew so we can trigger shutdown for these partitions
nonRenewedLeases.Add(lease);
}
}));
}
else
{
nonRenewedLeases.Add(lease);
}
}
// Renew leases for all partitions currently in shutdown
var failedToRenewShutdownLeases = new List<T>();
foreach (T shutdownLease in this.keepRenewingDuringClose.Values)
{
if (this.shouldRenewLeaseDelegate(shutdownLease.PartitionId))
{
renewTasks.Add(this.RenewLeaseAsync(shutdownLease).ContinueWith(renewResult =>
{
if (!renewResult.Result)
{
// Keep track of all failed attempts to renew shutdown leases so we can remove them from further renew attempts
failedToRenewShutdownLeases.Add(shutdownLease);
}
}));
}
}
// Wait for all renews to complete
await Task.WhenAll(renewTasks.ToArray());
// Trigger shutdown of all partitions we did not successfully renew leases for
await nonRenewedLeases.ParallelForEachAsync(lease => this.RemoveLeaseAsync(lease, false));
// Now remove all failed renewals of shutdown leases from further renewals
foreach (T failedToRenewShutdownLease in failedToRenewShutdownLeases)
{
T removedLease = null;
this.keepRenewingDuringClose.TryRemove(failedToRenewShutdownLease.PartitionId, out removedLease);
}
await Task.Delay(this.options.RenewInterval, this.leaseRenewerCancellationTokenSource.Token);
}
catch (OperationCanceledException)
{
this.settings.Logger.PartitionManagerInfo(
this.accountName,
this.taskHub,
this.workerName,
string.Empty /* partitionId */,
$"Background renewal task for {this.leaseType} leases was canceled.");
}
catch (Exception ex)
{
this.settings.Logger.PartitionManagerError(
this.accountName,
this.taskHub,
this.workerName,
string.Empty,
$"Failed during {this.leaseType} lease renewal: {ex}");
}
}
this.currentlyOwnedShards.Clear();
this.keepRenewingDuringClose.Clear();
this.settings.Logger.PartitionManagerInfo(
this.accountName,
this.taskHub,
this.workerName,
string.Empty /* partitionId */,
$"Background renewer task for {this.leaseType} leases completed.");
}