in src/DurableTask.AzureStorage/Partitioning/LeaseCollectionBalancer.cs [602:673]
async Task AddLeaseAsync(T lease)
{
if (this.currentlyOwnedShards.TryAdd(lease.PartitionId, lease))
{
bool failedToInitialize = false;
try
{
await this.leaseObserverManager.NotifyShardAcquiredAsync(lease);
}
catch (Exception ex)
{
failedToInitialize = true;
// Eat any exceptions during notification of observers
this.settings.Logger.PartitionManagerError(
this.accountName,
this.taskHub,
this.workerName,
lease.PartitionId,
$"Failed to notify observers of {this.leaseType} lease acquisition: {ex}");
}
// We need to release the lease if we fail to initialize the processor, so some other node can pick up the parition
if (failedToInitialize)
{
await this.RemoveLeaseAsync(lease, true);
}
}
else
{
// We already acquired lease for this partition but it looks like we previously owned this partition
// and haven't completed the shutdown process for it yet. Release lease for possible others hosts to
// pick it up.
try
{
this.settings.Logger.PartitionManagerWarning(
this.accountName,
this.taskHub,
this.workerName,
lease.PartitionId,
$"Unable to add {this.leaseType} lease with PartitionId '{lease.PartitionId}' with lease token '{lease.Token}' to currently owned leases.");
await this.RemoveLeaseAsync(lease, true);
this.settings.Logger.LeaseRemoved(
this.accountName,
this.taskHub,
this.workerName,
lease.PartitionId,
lease.Token,
this.leaseType);
}
catch (LeaseLostException)
{
this.settings.Logger.LeaseRemovalFailed(
this.accountName,
this.taskHub,
this.workerName,
lease.PartitionId,
lease.Token,
this.leaseType);
}
catch (Exception ex)
{
this.settings.Logger.PartitionManagerError(
this.accountName,
this.taskHub,
this.workerName,
lease.PartitionId,
$"Encountered a failure when removing a {this.leaseType} lease we previuosly owned: {ex}");
}
}
}