public async Task ReadAndWriteTableAsync()

in src/DurableTask.AzureStorage/Partitioning/TablePartitionManager.cs [349:457]


            public async Task<ReadTableReponse> ReadAndWriteTableAsync(bool isShuttingDown, CancellationToken forcefulShutdownToken)
            {
                var response = new ReadTableReponse();

                List<TablePartitionLease> partitions = await this.partitionTable
                    .ExecuteQueryAsync<TablePartitionLease>(cancellationToken: forcefulShutdownToken)
                    .ToListAsync();

                var partitionDistribution = new Dictionary<string, List<TablePartitionLease>>(); 
                int ownershipLeaseCount = 0;

                foreach (TablePartitionLease partition in partitions)
                {
                    // In a worker becomes unhealthy, it may lose a lease without realizing it and continue listening
                    // for messages. We check for that case here and stop dequeuing messages if we discover that
                    // another worker currently owns the lease.
                    this.service.DropLostControlQueue(partition);

                    bool claimedLease = false;
                    bool stoleLease = false;
                    bool renewedLease = false;
                    bool drainedLease = false;
                    bool releasedLease = false;
                    ETag etag = partition.ETag;

                    // String previousOwner is for the steal process logs. Only used for stealing leases of any worker which is in shutdown process in this loop.
                    string previousOwner = partition.CurrentOwner ?? this.workerName;

                    if (!isShuttingDown)
                    {
                        claimedLease = this.TryClaimLease(partition);

                        this.CheckOtherWorkersLeases(
                            partition,
                            partitionDistribution,
                            response,
                            ref ownershipLeaseCount,
                            ref previousOwner,
                            ref stoleLease);

                        this.RenewOrReleaseMyLease(
                            partition,
                            response,
                            ref ownershipLeaseCount,
                            ref releasedLease,
                            ref drainedLease,
                            ref renewedLease);
                    }
                    else
                    {
                        // If shutdown is requested, we drain and release all ownership partitions.
                        this.TryDrainAndReleaseAllPartitions(
                            partition,
                            response,
                            ref ownershipLeaseCount,
                            ref releasedLease,
                            ref drainedLease,
                            ref renewedLease);
                    }

                    // Save updates to the partition entity if the lease is claimed, stolen, renewed, drained or released.
                    if (claimedLease || stoleLease || renewedLease || drainedLease || releasedLease)
                    {
                        try
                        {
                            await this.partitionTable.ReplaceEntityAsync(partition, etag, forcefulShutdownToken);
                        }
                        catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed)
                        {
                            this.settings.Logger.PartitionManagerInfo(
                                this.storageAccountName,
                                this.settings.TaskHubName,
                                this.settings.WorkerId,
                                partition.RowKey,
                                $"Failed to update table entry due to an Etag mismatch. Failed ETag value: '{etag}'.");
                            throw;
                        }

                        // Ensure worker is listening to the control queue iff either:
                        // 1) worker just claimed the lease,
                        // 2) worker was already the owner in the partitions table and is not actively draining the queue.
                        //    Note that during draining, we renew the lease but do not want to listen to new messages.
                        //    Otherwise, we'll never finish draining our in-memory messages.
                        // When draining completes, and the worker may decide to release the lease. In that moment,
                        // IsDrainingPartition can still be true but renewedLease can be false — without checking
                        // !releasedLease, the worker could incorrectly resume listening just before releasing the lease.
                        bool isRenewingToDrainQueue = renewedLease && response.IsDrainingPartition && !releasedLease;
                        if (claimedLease || !isRenewingToDrainQueue)
                        {
                            // Notify the orchestration session manager that we acquired a lease for one of the partitions.
                            // This will cause it to start reading control queue messages for that partition.
                            await this.service.OnTableLeaseAcquiredAsync(partition);
                        }

                        this.LogHelper(partition, claimedLease, stoleLease, renewedLease, drainedLease, releasedLease, previousOwner);
                    }
                }

                // Separately from lease acquisition/renewal, make sure the partitions are evenly balanced across workers.
                await this.BalanceLeasesAsync(partitionDistribution, partitions, ownershipLeaseCount, response, forcefulShutdownToken);

                // If shutdown is requested and the worker releases all ownership leases, then set the response.IsReleasesAllLease to true to notify the partitionManagerLoop to stop.
                if (isShuttingDown)
                {
                    response.ReleasedAllLeases = ownershipLeaseCount == 0;
                }

                return response;
            }