protected override async Task ExecuteAsync()

in tools/pipeline-witness/Azure.Sdk.Tools.PipelineWitness/Services/QueueWorkerBackgroundService.cs [46:170]


        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            this.logger.LogInformation("Starting ExecuteAsync for {TypeName}", this.GetType().Name);

            string poisonQueueName = $"{this.queueName}-poison";

            QueueClient queueClient = this.queueServiceClient.GetQueueClient(this.queueName);
            QueueClient poisonQueueClient = this.queueServiceClient.GetQueueClient(poisonQueueName);

            await queueClient.CreateIfNotExistsAsync(cancellationToken: stoppingToken);
            await poisonQueueClient.CreateIfNotExistsAsync(cancellationToken: stoppingToken);

            while (true)
            {
                var loopTelementy = new RequestTelemetry
                { 
                    Name = "MessageLoopIteration", 
                    Properties = { ["QueueName"] = queueClient.Name }
                };

                using var loopOperation = this.telemetryClient.StartOperation(loopTelementy);

                PipelineWitnessSettings options = this.options.CurrentValue;

                this.logger.LogDebug("Getting next message from queue {QueueName}", queueClient.Name);

                TimeSpan pauseDuration = TimeSpan.Zero;

                try
                {
                    // We consider a message leased when it's made invisible in the queue and the current process has a
                    // valid PopReceipt for the message. The PopReceipt is used to perform subsequent operations on the
                    // "leased" message.
                    QueueMessage message = await queueClient.ReceiveMessageAsync(options.MessageLeasePeriod, stoppingToken);

                    if (message == null)
                    {
                        this.logger.LogDebug("The queue returned no message. Waiting {Delay}.", options.EmptyQueuePollDelay);
                        await Task.Delay(options.EmptyQueuePollDelay, stoppingToken);
                        continue;
                    }

                    if (message.InsertedOn.HasValue)
                    {
                        this.telemetryClient.TrackMetric(new MetricTelemetry
                        {
                            Name = $"{this.queueName} MessageLatencyMs",
                            Sum = DateTimeOffset.Now.Subtract(message.InsertedOn.Value).TotalMilliseconds
                        });
                    }

                    using IOperationHolder<RequestTelemetry> messageOperation = this.telemetryClient.StartOperation(new RequestTelemetry
                    {
                        Name = "ProcessMessage",
                        Properties = { ["MessageId"] = message.MessageId }
                    });

                    try
                    {
                        this.logger.LogDebug("The queue returned a message.\n  Queue: {Queue}\n  Message: {MessageId}\n  Dequeue Count: {DequeueCount}\n  Pop Receipt: {PopReceipt}", queueClient.Name, message.MessageId, message.DequeueCount, message.PopReceipt);

                        using CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);

                        // Because processing a message may take longer than our initial lease period, we want to continually
                        // renew our lease until processing completes.
                        var renewTask = RenewMessageLeaseAsync(queueClient, message, cts.Token);
                        var processTask = SafelyProcessMessageAsync(message, cts.Token);

                        Task.WaitAny([renewTask, processTask], CancellationToken.None);

                        cts.Cancel();

                        // if the renew task doesn't complete successfully, we can't trust the PopReceipt on the message and must abort.
                        string latestPopReceipt = await renewTask;
                        var result = await processTask;
                        pauseDuration = result.pauseDuration;

                        if (result.Success)
                        {
                            this.logger.LogDebug("Message processed successfully. Removing message from queue.\n  MessageId: {MessageId}\n  Queue: {QueueName}\n  PopReceipt: {PopReceipt}", message.MessageId, queueClient.Name, latestPopReceipt);
                            await queueClient.DeleteMessageAsync(message.MessageId, latestPopReceipt, stoppingToken);
                            messageOperation.Telemetry.Success = true;
                        }
                        else
                        {
                            messageOperation.Telemetry.Success = false;
                            if (message.DequeueCount > options.MaxDequeueCount)
                            {
                                this.logger.LogError("Message {MessageId} exceeded maximum dequeue count. Moving to poison queue {QueueName}", message.MessageId, poisonQueueClient.Name);
                                await poisonQueueClient.SendMessageAsync(message.Body, cancellationToken: stoppingToken);
                                this.logger.LogDebug("Removing message from queue.\n  MessageId: {MessageId}\n  Queue: {QueueName}\n  PopReceipt: {PopReceipt}", message.MessageId, queueClient.Name, latestPopReceipt);
                                await queueClient.DeleteMessageAsync(message.MessageId, latestPopReceipt, stoppingToken);
                            }
                            else
                            {
                                // Use message.DequeueCount for exponential backoff
                                var sleepMultiplier = Math.Pow(2, Math.Max(message.DequeueCount - 1, 0));
                                var sleepPeriod = TimeSpan.FromSeconds(sleepMultiplier * options.MessageErrorSleepPeriod.TotalSeconds);

                                this.logger.LogError("Resetting message visibility timeout to {SleepPeriod}.\n  MessageId: {MessageId}\n  Queue: {QueueName}\n  PopReceipt: {PopReceipt}", sleepPeriod, message.MessageId, queueClient.Name, latestPopReceipt);
                                await queueClient.UpdateMessageAsync(message.MessageId, latestPopReceipt, message.Body, sleepPeriod, cancellationToken: stoppingToken);
                            }
                        }
                    }
                    catch (Exception ex)
                    {
                        this.logger.LogError(ex, "Exception thrown while procesing queue message.");
                        messageOperation.Telemetry.Success = false;
                    }
                }
                catch (Exception ex)
                {
                    this.logger.LogError(ex, "Exception thrown while procesing message loop.");
                    pauseDuration = options.MessageErrorSleepPeriod;
                    loopOperation.Telemetry.Success = false;
                }


                if (pauseDuration != TimeSpan.Zero)
                {
                    this.logger.LogWarning("Pause in processing requested. Waiting {PauseDuration}.", pauseDuration);
                    await Task.Delay(pauseDuration, stoppingToken);
                }
            }
        }