in src/DurableTask.AzureStorage/Partitioning/TablePartitionManager.cs [108:222]
async Task PartitionManagerLoop(CancellationToken gracefulShutdownToken, CancellationToken forcefulShutdownToken)
{
const int MaxFailureCount = 10;
int consecutiveFailureCount = 0;
bool isShuttingDown = gracefulShutdownToken.IsCancellationRequested;
while (true)
{
TimeSpan timeToSleep = this.options.AcquireInterval;
try
{
using var timeoutCts = new CancellationTokenSource(this.settings.PartitionTableOperationTimeout);
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(forcefulShutdownToken, timeoutCts.Token);
ReadTableReponse response = await this.tableLeaseManager.ReadAndWriteTableAsync(isShuttingDown, linkedCts.Token);
// If shutdown is requested and already released all ownership leases, then break the loop.
if (isShuttingDown && response.ReleasedAllLeases)
{
this.settings.Logger.PartitionManagerInfo(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partitionId: NotApplicable,
"Successfully released all ownership leases for shutdown.");
break;
}
// Poll more frequently if we are draining a partition or waiting for a partition to be released
// by another worker. This is a temporary state and we want to try and be as responsive to updates
// as possible to minimize the time spent in this state, which is effectively downtime for orchestrations.
if (response.IsDrainingPartition || response.IsWaitingForPartitionRelease)
{
timeToSleep = TimeSpan.FromSeconds(1);
}
consecutiveFailureCount = 0;
}
// Exception Status 412 represents an out of date ETag. We already logged this.
catch (DurableTaskStorageException ex) when (ex.HttpStatusCode == (int)HttpStatusCode.PreconditionFailed)
{
consecutiveFailureCount++;
}
// ReadAndWriteTableAsync exceeded the set timeout.
// This may indicate a transient storage or network issue.
// The operation will be retried immediately unless it fails more than 10 consecutive times.
catch (OperationCanceledException) when (!forcefulShutdownToken.IsCancellationRequested)
{
this.settings.Logger.PartitionManagerWarning(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partitionId: NotApplicable,
details: "Operation to read and write the partition table exceeded the 2-second timeout.");
consecutiveFailureCount++;
}
// Eat any unexpected exceptions.
catch (Exception exception)
{
this.settings.Logger.PartitionManagerError(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partitionId: NotApplicable,
details: $"Unexpected error occurred while trying to manage table partition leases: {exception}");
consecutiveFailureCount++;
}
// If table update failed, we re-read the table immediately to obtain the latest ETag.
// In the case of too many successive failures, we wait before retrying to prevent excessive logs.
if (consecutiveFailureCount > 0 && consecutiveFailureCount < MaxFailureCount)
{
timeToSleep = TimeSpan.FromSeconds(0);
}
try
{
if (isShuttingDown || forcefulShutdownToken.IsCancellationRequested)
{
// If shutdown is required, we sleep for a short period to ensure a relatively fast shutdown process
await Task.Delay(timeToSleep, forcefulShutdownToken);
}
else
{
// Normal case: the amount of time we sleep varies depending on the situation.
await Task.Delay(timeToSleep, gracefulShutdownToken);
}
}
catch (OperationCanceledException) when (gracefulShutdownToken.IsCancellationRequested)
{
// Shutdown requested, but we still need to release all leases
if (!isShuttingDown)
{
isShuttingDown = true;
this.settings.Logger.PartitionManagerInfo(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partitionId: NotApplicable,
details: $"Requested to cancel partition manager table manage loop. Initiate shutdown process.");
}
}
}
this.settings.Logger.PartitionManagerInfo(
this.storageAccountName,
this.settings.TaskHubName,
this.settings.WorkerId,
partitionId: NotApplicable,
"Stopped background table partition manager loop.");
}