async Task PartitionManagerLoop()

in src/DurableTask.AzureStorage/Partitioning/TablePartitionManager.cs [108:222]


        async Task PartitionManagerLoop(CancellationToken gracefulShutdownToken, CancellationToken forcefulShutdownToken)
        {
            const int MaxFailureCount = 10;

            int consecutiveFailureCount = 0;
            bool isShuttingDown = gracefulShutdownToken.IsCancellationRequested;

            while (true)
            {
                TimeSpan timeToSleep = this.options.AcquireInterval;

                try
                {
                    using var timeoutCts = new CancellationTokenSource(this.settings.PartitionTableOperationTimeout);
                    using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(forcefulShutdownToken, timeoutCts.Token);

                    ReadTableReponse response = await this.tableLeaseManager.ReadAndWriteTableAsync(isShuttingDown, linkedCts.Token);

                    // If shutdown is requested and already released all ownership leases, then break the loop. 
                    if (isShuttingDown && response.ReleasedAllLeases)
                    {
                        this.settings.Logger.PartitionManagerInfo(
                            this.storageAccountName,
                            this.settings.TaskHubName,
                            this.settings.WorkerId,
                            partitionId: NotApplicable,
                            "Successfully released all ownership leases for shutdown.");
                        break;
                    }

                    // Poll more frequently if we are draining a partition or waiting for a partition to be released
                    // by another worker. This is a temporary state and we want to try and be as responsive to updates
                    // as possible to minimize the time spent in this state, which is effectively downtime for orchestrations.
                    if (response.IsDrainingPartition || response.IsWaitingForPartitionRelease)
                    {
                        timeToSleep = TimeSpan.FromSeconds(1);
                    }

                    consecutiveFailureCount = 0;
                }
                // Exception Status 412 represents an out of date ETag. We already logged this.
                catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed)
                {
                    consecutiveFailureCount++;
                }
                // ReadAndWriteTableAsync exceeded the set timeout.
                // This may indicate a transient storage or network issue.
                // The operation will be retried immediately unless it fails more than 10 consecutive times.
                catch (OperationCanceledException) when (!forcefulShutdownToken.IsCancellationRequested)
                {
                    this.settings.Logger.PartitionManagerWarning(
                        this.storageAccountName,
                        this.settings.TaskHubName,
                        this.settings.WorkerId,
                        partitionId: NotApplicable,
                        details: "Operation to read and write the partition table exceeded the 2-second timeout.");

                    consecutiveFailureCount++;
                }
                // Eat any unexpected exceptions.
                catch (Exception exception)
                {
                    this.settings.Logger.PartitionManagerError(
                        this.storageAccountName,
                        this.settings.TaskHubName,
                        this.settings.WorkerId,
                        partitionId: NotApplicable,
                        details: $"Unexpected error occurred while trying to manage table partition leases: {exception}");

                    consecutiveFailureCount++;
                }

                // If table update failed, we re-read the table immediately to obtain the latest ETag.
                // In the case of too many successive failures, we wait before retrying to prevent excessive logs.
                if (consecutiveFailureCount > 0 && consecutiveFailureCount < MaxFailureCount)
                {
                    timeToSleep = TimeSpan.FromSeconds(0);
                }

                try
                {
                    if (isShuttingDown || forcefulShutdownToken.IsCancellationRequested)
                    {
                        // If shutdown is required, we sleep for a short period to ensure a relatively fast shutdown process
                        await Task.Delay(timeToSleep, forcefulShutdownToken);
                    }
                    else
                    {
                        // Normal case: the amount of time we sleep varies depending on the situation.
                        await Task.Delay(timeToSleep, gracefulShutdownToken);
                    }
                }
                catch (OperationCanceledException) when (gracefulShutdownToken.IsCancellationRequested)
                {
                    // Shutdown requested, but we still need to release all leases
                    if (!isShuttingDown)
                    {
                        isShuttingDown = true;
                        this.settings.Logger.PartitionManagerInfo(
                            this.storageAccountName,
                            this.settings.TaskHubName,
                            this.settings.WorkerId,
                            partitionId: NotApplicable,
                            details: $"Requested to cancel partition manager table manage loop. Initiate shutdown process.");
                    }
                }
            }

            this.settings.Logger.PartitionManagerInfo(
                this.storageAccountName,
                this.settings.TaskHubName,
                this.settings.WorkerId,
                partitionId: NotApplicable,
                "Stopped background table partition manager loop.");
        }