in tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/Services/QueueWorkerBackgroundService.cs [46:170]
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
this.logger.LogInformation("Starting ExecuteAsync for {TypeName}", this.GetType().Name);
string poisonQueueName = $"{this.queueName}-poison";
QueueClient queueClient = this.queueServiceClient.GetQueueClient(this.queueName);
QueueClient poisonQueueClient = this.queueServiceClient.GetQueueClient(poisonQueueName);
await queueClient.CreateIfNotExistsAsync(cancellationToken: stoppingToken);
await poisonQueueClient.CreateIfNotExistsAsync(cancellationToken: stoppingToken);
while (true)
{
var loopTelementy = new RequestTelemetry
{
Name = "MessageLoopIteration",
Properties = { ["QueueName"] = queueClient.Name }
};
using var loopOperation = this.telemetryClient.StartOperation(loopTelementy);
PipelineWitnessSettings options = this.options.CurrentValue;
this.logger.LogDebug("Getting next message from queue {QueueName}", queueClient.Name);
TimeSpan pauseDuration = TimeSpan.Zero;
try
{
// We consider a message leased when it's made invisible in the queue and the current process has a
// valid PopReceipt for the message. The PopReceipt is used to perform subsequent operations on the
// "leased" message.
QueueMessage message = await queueClient.ReceiveMessageAsync(options.MessageLeasePeriod, stoppingToken);
if (message == null)
{
this.logger.LogDebug("The queue returned no message. Waiting {Delay}.", options.EmptyQueuePollDelay);
await Task.Delay(options.EmptyQueuePollDelay, stoppingToken);
continue;
}
if (message.InsertedOn.HasValue)
{
this.telemetryClient.TrackMetric(new MetricTelemetry
{
Name = $"{this.queueName} MessageLatencyMs",
Sum = DateTimeOffset.Now.Subtract(message.InsertedOn.Value).TotalMilliseconds
});
}
using IOperationHolder<RequestTelemetry> messageOperation = this.telemetryClient.StartOperation(new RequestTelemetry
{
Name = "ProcessMessage",
Properties = { ["MessageId"] = message.MessageId }
});
try
{
this.logger.LogDebug("The queue returned a message.\n Queue: {Queue}\n Message: {MessageId}\n Dequeue Count: {DequeueCount}\n Pop Receipt: {PopReceipt}", queueClient.Name, message.MessageId, message.DequeueCount, message.PopReceipt);
using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
// Because processing a message may take longer than our initial lease period, we want to continually
// renew our lease until processing completes.
var renewTask = RenewMessageLeaseAsync(queueClient, message, cts.Token);
var processTask = SafelyProcessMessageAsync(message, cts.Token);
Task.WaitAny([renewTask, processTask], CancellationToken.None);
cts.Cancel();
// if the renew task doesn't complete successfully, we can't trust the PopReceipt on the message and must abort.
string latestPopReceipt = await renewTask;
var result = await processTask;
pauseDuration = result.pauseDuration;
if (result.Success)
{
this.logger.LogDebug("Message processed successfully. Removing message from queue.\n MessageId: {MessageId}\n Queue: {QueueName}\n PopReceipt: {PopReceipt}", message.MessageId, queueClient.Name, latestPopReceipt);
await queueClient.DeleteMessageAsync(message.MessageId, latestPopReceipt, stoppingToken);
messageOperation.Telemetry.Success = true;
}
else
{
messageOperation.Telemetry.Success = false;
if (message.DequeueCount > options.MaxDequeueCount)
{
this.logger.LogError("Message {MessageId} exceeded maximum dequeue count. Moving to poison queue {QueueName}", message.MessageId, poisonQueueClient.Name);
await poisonQueueClient.SendMessageAsync(message.Body, cancellationToken: stoppingToken);
this.logger.LogDebug("Removing message from queue.\n MessageId: {MessageId}\n Queue: {QueueName}\n PopReceipt: {PopReceipt}", message.MessageId, queueClient.Name, latestPopReceipt);
await queueClient.DeleteMessageAsync(message.MessageId, latestPopReceipt, stoppingToken);
}
else
{
// Use message.DequeueCount for exponential backoff
var sleepMultiplier = Math.Pow(2, Math.Max(message.DequeueCount - 1, 0));
var sleepPeriod = TimeSpan.FromSeconds(sleepMultiplier * options.MessageErrorSleepPeriod.TotalSeconds);
this.logger.LogError("Resetting message visibility timeout to {SleepPeriod}.\n MessageId: {MessageId}\n Queue: {QueueName}\n PopReceipt: {PopReceipt}", sleepPeriod, message.MessageId, queueClient.Name, latestPopReceipt);
await queueClient.UpdateMessageAsync(message.MessageId, latestPopReceipt, message.Body, sleepPeriod, cancellationToken: stoppingToken);
}
}
}
catch (Exception ex)
{
this.logger.LogError(ex, "Exception thrown while procesing queue message.");
messageOperation.Telemetry.Success = false;
}
}
catch (Exception ex)
{
this.logger.LogError(ex, "Exception thrown while procesing message loop.");
pauseDuration = options.MessageErrorSleepPeriod;
loopOperation.Telemetry.Success = false;
}
if (pauseDuration != TimeSpan.Zero)
{
this.logger.LogWarning("Pause in processing requested. Waiting {PauseDuration}.", pauseDuration);
await Task.Delay(pauseDuration, stoppingToken);
}
}
}