async Task BalanceLeasesAsync()

in src/DurableTask.AzureStorage/Partitioning/TablePartitionManager.cs [610:707]


            async Task BalanceLeasesAsync(
                Dictionary<string, List<TablePartitionLease>> partitionDistribution,
                IReadOnlyList<TablePartitionLease> partitions,
                int ownershipLeaseCount,
                ReadTableReponse response,
                CancellationToken forceShutdownToken)
            {
                if (partitionDistribution.Count == 0)
                {
                    // No partitions to be balanced.
                    return;
                }

                int averageLeasesCount = partitions.Count / (partitionDistribution.Count + 1);
                if (averageLeasesCount < ownershipLeaseCount)
                {
                    // Already have enough leases. Return since there is no need to steal other workers' partitions.
                    return;
                }

                // If this worker does not own enough partitions, search for leases to steal
                foreach (IReadOnlyList<TablePartitionLease> ownedPartitions in partitionDistribution.Values)
                {
                    int numLeasesToSteal = averageLeasesCount - ownershipLeaseCount;
                    if (numLeasesToSteal < 0)
                    {
                        // The current worker already has enough partitions.
                        break;
                    }

                    // Only steal leases from takshub workers that own more leases than average.
                    // If a given task hub worker's lease count is less or equal to the average, skip it.
                    int numExcessiveLease = ownedPartitions.Count - averageLeasesCount;
                    if (numExcessiveLease <= 0)
                    {
                        continue;
                    }

                    // The balancing condition requires that the differences in the number of leases assigned to each worker should not exceed 1, if the total number of partitions is not evenly divisible by the number of active workers.
                    // Thus, the maximum number of leases a worker can own is the average number of leases per worker plus one in this case.
                    // If a worker has more than one lease difference than average and _this_ worker has not reached the maximum, it should steal an additional lease.
                    if (numLeasesToSteal == 0 && numExcessiveLease > 1)
                    {
                        numLeasesToSteal = 1;
                    }

                    numLeasesToSteal = Math.Min(numLeasesToSteal, numExcessiveLease);
                    for (int i = 0; i < numLeasesToSteal; i++)
                    {
                        ownershipLeaseCount++;
                        TablePartitionLease partition = ownedPartitions[i];
                        ETag etag = partition.ETag;
                        string previousOwner = partition.CurrentOwner!;
                        this.StealLease(partition);

                        try
                        {
                            await this.partitionTable.ReplaceEntityAsync(
                                partition,
                                etag,
                                forceShutdownToken);

                            this.settings.Logger.LeaseStealingSucceeded(
                                this.storageAccountName,
                                this.settings.TaskHubName,
                                this.settings.WorkerId,
                                previousOwner,
                                leaseType: NotApplicable,
                                partitionId: partition.RowKey);
                            
                            response.IsWaitingForPartitionRelease = true;
                        }
                        catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed /* ETag conflict */)
                        {
                            this.settings.Logger.PartitionManagerInfo(
                                this.storageAccountName,
                                this.settings.TaskHubName,
                                this.settings.WorkerId,
                                partition.RowKey,
                                $"Failed to update table entry due to an Etag mismatch. Failed ETag value: '{etag}'");

                            // Another worker already modified this partition entry. Let the exception bubble up to the main
                            // loop, which will re-read the table immediately to get the latest updates.
                            throw;
                        }
                        catch (Exception exception)
                        {
                            // Eat any exceptions during lease stealing because we want to keep iterating through the partition list.
                            this.settings.Logger.PartitionManagerWarning(
                                this.storageAccountName,
                                this.settings.TaskHubName,
                                this.settings.WorkerId,
                                partition.RowKey,
                                $"Unexpected error occurred in stealing partition lease: {exception}");
                        }
                    }
                }
            }