in src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs [237:306]
protected async Task<UpdateReceipt?> AbandonMessageAsync(
QueueMessage queueMessage,
TaskMessage? taskMessage,
OrchestrationInstance? instance,
Guid? traceActivityId,
long sequenceNumber)
{
string instanceId = instance?.InstanceId ?? string.Empty;
string executionId = instance?.ExecutionId ?? string.Empty;
string eventType = taskMessage?.Event.EventType.ToString() ?? string.Empty;
int taskEventId = taskMessage != null ? Utils.GetTaskEventId(taskMessage.Event) : -1;
// Exponentially backoff a given queue message until a maximum visibility delay of 10 minutes.
// Once it hits the maximum, log the message as a poison message.
const int maxSecondsToWait = 600;
int numSecondsToWait = queueMessage.DequeueCount <= 30 ?
Math.Min((int)Math.Pow(2, queueMessage.DequeueCount), maxSecondsToWait) :
maxSecondsToWait;
if (numSecondsToWait == maxSecondsToWait)
{
this.settings.Logger.PoisonMessageDetected(
this.storageAccountName,
this.settings.TaskHubName,
eventType,
taskEventId,
queueMessage.MessageId,
instanceId,
executionId,
this.storageQueue.Name,
queueMessage.DequeueCount);
}
this.settings.Logger.AbandoningMessage(
this.storageAccountName,
this.settings.TaskHubName,
eventType,
taskEventId,
queueMessage.MessageId,
instanceId,
executionId,
this.storageQueue.Name,
sequenceNumber,
queueMessage.PopReceipt,
numSecondsToWait);
try
{
// We "abandon" the message by settings its visibility timeout using an exponential backoff algorithm.
// This allows it to be reprocessed on this node or another node at a later time, hopefully successfully.
return await this.storageQueue.UpdateMessageAsync(
queueMessage,
TimeSpan.FromSeconds(numSecondsToWait),
traceActivityId);
}
catch (Exception e)
{
// Message may have been processed and deleted already.
this.HandleMessagingExceptions(
e,
queueMessage.MessageId,
instanceId,
executionId,
eventType,
taskEventId,
details: $"Caller: {nameof(AbandonMessageAsync)}",
queueMessage.PopReceipt);
return null;
}
}