in Tools/LambdaTestTool-v2/src/Amazon.Lambda.TestTool/Processes/SQSEventSource/SQSEventSourceBackgroundService.cs [62:184]
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// The queue arn is needed for creating the Lambda event.
var queueArn = await GetQueueArn(stoppingToken);
_logger.LogInformation("Starting polling for messages on SQS queue: {queueArn}", queueArn);
while (!stoppingToken.IsCancellationRequested)
{
try
{
_logger.LogDebug("Polling {queueUrl} for messages", _config.QueueUrl);
// Read a message from the queue using the ExternalCommands console application.
var response = await _sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest
{
QueueUrl = _config.QueueUrl,
WaitTimeSeconds = 20,
MessageAttributeNames = DefaultAttributesToReceive,
MessageSystemAttributeNames = DefaultAttributesToReceive,
MaxNumberOfMessages = _config.BatchSize,
VisibilityTimeout = _config.VisibilityTimeout,
}, stoppingToken);
if (stoppingToken.IsCancellationRequested)
{
return;
}
if (response.Messages == null || response.Messages.Count == 0)
{
_logger.LogDebug("No messages received from while polling SQS");
// Since there are no messages, sleep a bit to wait for messages to come.
await Task.Delay(200);
continue;
}
var lambdaPayload = new
{
Records = ConvertToLambdaMessages(response.Messages, queueArn)
};
var invokeRequest = new InvokeRequest
{
InvocationType = InvocationType.RequestResponse,
FunctionName = _config.FunctionName,
Payload = JsonSerializer.Serialize(lambdaPayload, _jsonOptions)
};
_logger.LogInformation("Invoking Lambda function {functionName} function with {messageCount} messages", _config.FunctionName, lambdaPayload.Records.Count);
var lambdaResponse = await _lambdaClient.InvokeAsync(invokeRequest, _config.LambdaRuntimeApi);
if (lambdaResponse.FunctionError != null)
{
_logger.LogError("Invoking Lambda {function} function with {messageCount} failed with error {errorMessage}", _config.FunctionName, response.Messages.Count, lambdaResponse.FunctionError);
continue;
}
if (!_config.DisableMessageDelete)
{
List<Message> messagesToDelete;
if (lambdaResponse.Payload != null && lambdaResponse.Payload.Length > 0)
{
var partialResponse = JsonSerializer.Deserialize<SQSBatchResponse>(lambdaResponse.Payload);
if (partialResponse == null)
{
lambdaResponse.Payload.Position = 0;
using var reader = new StreamReader(lambdaResponse.Payload);
var payloadString = reader.ReadToEnd();
_logger.LogError("Failed to deserialize response from Lambda function into SQSBatchResponse. Response payload:\n{payload}", payloadString);
continue;
}
if (partialResponse.BatchItemFailures == null || partialResponse.BatchItemFailures.Count == 0)
{
_logger.LogDebug("Partial SQS response received with no failures");
messagesToDelete = response.Messages;
}
else
{
_logger.LogDebug("Partial SQS response received with {count} failures", partialResponse.BatchItemFailures.Count);
messagesToDelete = new List<Message>();
foreach (var message in response.Messages)
{
if (!partialResponse.BatchItemFailures.Any(x => string.Equals(x.ItemIdentifier, message.MessageId)))
{
messagesToDelete.Add(message);
}
}
}
}
else
{
_logger.LogDebug("No partial response received. All messages eligible for deletion");
messagesToDelete = response.Messages;
}
if (messagesToDelete.Count > 0)
{
var deleteRequest = new DeleteMessageBatchRequest
{
QueueUrl = _config.QueueUrl,
Entries = messagesToDelete.Select(m => new DeleteMessageBatchRequestEntry { Id = m.MessageId, ReceiptHandle = m.ReceiptHandle }).ToList()
};
_logger.LogDebug("Deleting {messageCount} messages from queue", deleteRequest.Entries.Count);
await _sqsClient.DeleteMessageBatchAsync(deleteRequest, stoppingToken);
}
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
return;
}
catch (TaskCanceledException) when (stoppingToken.IsCancellationRequested)
{
return;
}
catch (Exception e)
{
_logger.LogWarning(e, "Exception occurred in SQS poller for {queueUrl}: {message}", _config.QueueUrl, e.Message);
// Add a delay before restarting loop in case the exception was a transient error that needs a little time to reset.
await Task.Delay(3000);
}
}
}