protected override async Task ExecuteAsync()

in Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceBackgroundService.cs [62:184]


    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // The queue arn is needed for creating the Lambda event.
        var queueArn = await GetQueueArn(stoppingToken);
        _logger.LogInformation("Starting polling for messages on SQS queue: {queueArn}", queueArn);
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                _logger.LogDebug("Polling {queueUrl} for messages", _config.QueueUrl);
                // Read a message from the queue using the ExternalCommands console application.
                var response = await _sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest
                {
                    QueueUrl = _config.QueueUrl,
                    WaitTimeSeconds = 20,
                    MessageAttributeNames = DefaultAttributesToReceive,
                    MessageSystemAttributeNames = DefaultAttributesToReceive,
                    MaxNumberOfMessages = _config.BatchSize,
                    VisibilityTimeout = _config.VisibilityTimeout,
                }, stoppingToken);

                if (stoppingToken.IsCancellationRequested)
                {
                    return;
                }
                if (response.Messages == null || response.Messages.Count == 0)
                {
                    _logger.LogDebug("No messages received from while polling SQS");
                    // Since there are no messages, sleep a bit to wait for messages to come.
                    await Task.Delay(200);
                    continue;
                }

                var lambdaPayload = new
                {
                    Records = ConvertToLambdaMessages(response.Messages, queueArn)
                };

                var invokeRequest = new InvokeRequest
                {
                    InvocationType = InvocationType.RequestResponse,
                    FunctionName = _config.FunctionName,
                    Payload = JsonSerializer.Serialize(lambdaPayload, _jsonOptions)
                };

                _logger.LogInformation("Invoking Lambda function {functionName} function with {messageCount} messages", _config.FunctionName, lambdaPayload.Records.Count);
                var lambdaResponse = await _lambdaClient.InvokeAsync(invokeRequest, _config.LambdaRuntimeApi);

                if (lambdaResponse.FunctionError != null)
                {
                    _logger.LogError("Invoking Lambda {function} function with {messageCount} failed with error {errorMessage}", _config.FunctionName, response.Messages.Count, lambdaResponse.FunctionError);
                    continue;
                }

                if (!_config.DisableMessageDelete)
                {
                    List<Message> messagesToDelete;
                    if (lambdaResponse.Payload != null && lambdaResponse.Payload.Length > 0)
                    {
                        var partialResponse = JsonSerializer.Deserialize<SQSBatchResponse>(lambdaResponse.Payload);
                        if (partialResponse == null)
                        {
                            lambdaResponse.Payload.Position = 0;
                            using var reader = new StreamReader(lambdaResponse.Payload);
                            var payloadString = reader.ReadToEnd();
                            _logger.LogError("Failed to deserialize response from Lambda function into SQSBatchResponse. Response payload:\n{payload}", payloadString);
                            continue;
                        }

                        if (partialResponse.BatchItemFailures == null || partialResponse.BatchItemFailures.Count == 0)
                        {
                            _logger.LogDebug("Partial SQS response received with no failures");
                            messagesToDelete = response.Messages;
                        }
                        else
                        {
                            _logger.LogDebug("Partial SQS response received with {count} failures", partialResponse.BatchItemFailures.Count);
                            messagesToDelete = new List<Message>();
                            foreach (var message in response.Messages)
                            {
                                if (!partialResponse.BatchItemFailures.Any(x => string.Equals(x.ItemIdentifier, message.MessageId)))
                                {
                                    messagesToDelete.Add(message);
                                }
                            }
                        }
                    }
                    else
                    {
                        _logger.LogDebug("No partial response received. All messages eligible for deletion");
                        messagesToDelete = response.Messages;
                    }

                    if (messagesToDelete.Count > 0)
                    {
                        var deleteRequest = new DeleteMessageBatchRequest
                        {
                            QueueUrl = _config.QueueUrl,
                            Entries = messagesToDelete.Select(m => new DeleteMessageBatchRequestEntry { Id = m.MessageId, ReceiptHandle = m.ReceiptHandle }).ToList()
                        };

                        _logger.LogDebug("Deleting {messageCount} messages from queue", deleteRequest.Entries.Count);
                        await _sqsClient.DeleteMessageBatchAsync(deleteRequest, stoppingToken);
                    }
                }
            }
            catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
            {
                return;
            }
            catch (TaskCanceledException) when (stoppingToken.IsCancellationRequested)
            {
                return;
            }
            catch (Exception e)
            {
                _logger.LogWarning(e, "Exception occurred in SQS poller for {queueUrl}: {message}", _config.QueueUrl, e.Message);

                // Add a delay before restarting loop in case the exception was a transient error that needs a little time to reset.
                await Task.Delay(3000);
            }
        }
    }