in src/DurableTask.AzureStorage/Partitioning/TablePartitionManager.cs [349:457]
public async Task<ReadTableReponse> ReadAndWriteTableAsync(bool isShuttingDown, CancellationToken forcefulShutdownToken)
{
var response = new ReadTableReponse();
List<TablePartitionLease> partitions = await this.partitionTable
.ExecuteQueryAsync<TablePartitionLease>(cancellationToken: forcefulShutdownToken)
.ToListAsync();
var partitionDistribution = new Dictionary<string, List<TablePartitionLease>>();
int ownershipLeaseCount = 0;
foreach (TablePartitionLease partition in partitions)
{
// In a worker becomes unhealthy, it may lose a lease without realizing it and continue listening
// for messages. We check for that case here and stop dequeuing messages if we discover that
// another worker currently owns the lease.
this.service.DropLostControlQueue(partition);
bool claimedLease = false;
bool stoleLease = false;
bool renewedLease = false;
bool drainedLease = false;
bool releasedLease = false;
ETag etag = partition.ETag;
// String previousOwner is for the steal process logs. Only used for stealing leases of any worker which is in shutdown process in this loop.
string previousOwner = partition.CurrentOwner ?? this.workerName;
if (!isShuttingDown)
{
claimedLease = this.TryClaimLease(partition);
this.CheckOtherWorkersLeases(
partition,
partitionDistribution,
response,
ref ownershipLeaseCount,
ref previousOwner,
ref stoleLease);
this.RenewOrReleaseMyLease(
partition,
response,
ref ownershipLeaseCount,
ref releasedLease,
ref drainedLease,
ref renewedLease);
}
else
{
// If shutdown is requested, we drain and release all ownership partitions.
this.TryDrainAndReleaseAllPartitions(
partition,
response,
ref ownershipLeaseCount,
ref releasedLease,
ref drainedLease,
ref renewedLease);
}
// Save updates to the partition entity if the lease is claimed, stolen, renewed, drained or released.
if (claimedLease || stoleLease || renewedLease || drainedLease || releasedLease)
{
try
{
await this.partitionTable.ReplaceEntityAsync(partition, etag, forcefulShutdownToken);
}
catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed)
{
this.settings.Logger.PartitionManagerInfo(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partition.RowKey,
$"Failed to update table entry due to an Etag mismatch. Failed ETag value: '{etag}'.");
throw;
}
// Ensure worker is listening to the control queue iff either:
// 1) worker just claimed the lease,
// 2) worker was already the owner in the partitions table and is not actively draining the queue.
// Note that during draining, we renew the lease but do not want to listen to new messages.
// Otherwise, we'll never finish draining our in-memory messages.
// When draining completes, and the worker may decide to release the lease. In that moment,
// IsDrainingPartition can still be true but renewedLease can be false — without checking
// !releasedLease, the worker could incorrectly resume listening just before releasing the lease.
bool isRenewingToDrainQueue = renewedLease && response.IsDrainingPartition && !releasedLease;
if (claimedLease || !isRenewingToDrainQueue)
{
// Notify the orchestration session manager that we acquired a lease for one of the partitions.
// This will cause it to start reading control queue messages for that partition.
await this.service.OnTableLeaseAcquiredAsync(partition);
}
this.LogHelper(partition, claimedLease, stoleLease, renewedLease, drainedLease, releasedLease, previousOwner);
}
}
// Separately from lease acquisition/renewal, make sure the partitions are evenly balanced across workers.
await this.BalanceLeasesAsync(partitionDistribution, partitions, ownershipLeaseCount, response, forcefulShutdownToken);
// If shutdown is requested and the worker releases all ownership leases, then set the response.IsReleasesAllLease to true to notify the partitionManagerLoop to stop.
if (isShuttingDown)
{
response.ReleasedAllLeases = ownershipLeaseCount == 0;
}
return response;
}