samcli/lib/remote_invoke/sqs_invoke_executors.py (96 lines of code) (raw):
"""
Remote invoke executor implementation for SQS
"""
import json
import logging
from dataclasses import asdict, dataclass
from json.decoder import JSONDecodeError
from typing import Optional, cast
from botocore.exceptions import ClientError, ParamValidationError
from mypy_boto3_sqs import SQSClient
from samcli.lib.remote_invoke.exceptions import (
ErrorBotoApiCallException,
InvalidResourceBotoParameterException,
)
from samcli.lib.remote_invoke.remote_invoke_executors import (
BotoActionExecutor,
RemoteInvokeIterableResponseType,
RemoteInvokeOutputFormat,
RemoteInvokeResponse,
)
LOG = logging.getLogger(__name__)
QUEUE_URL = "QueueUrl"
MESSAGE_BODY = "MessageBody"
DELAY_SECONDS = "DelaySeconds"
MESSAGE_ATTRIBUTES = "MessageAttributes"
MESSAGE_SYSTEM_ATTRIBUTES = "MessageSystemAttributes"
@dataclass
class SqsSendMessageTextOutput:
"""
Dataclass that stores send_message boto3 API fields used to create
text output.
"""
MD5OfMessageBody: str
MessageId: str
MD5OfMessageAttributes: Optional[str] = None
def get_output_response_dict(self) -> dict:
"""
Returns a dict of existing dataclass fields.
Returns
-------
dict
Returns the dict of the fields that will be used as the output response for
text format output.
"""
return asdict(self, dict_factory=lambda x: {k: v for (k, v) in x if v is not None})
class SqsSendMessageExecutor(BotoActionExecutor):
"""
Calls "send_message" method of "SQS" service with given input.
If a file location provided, the file handle will be passed as input object.
"""
_sqs_client: SQSClient
_queue_url: str
_remote_output_format: RemoteInvokeOutputFormat
request_parameters: dict
def __init__(self, sqs_client: SQSClient, physical_id: str, remote_output_format: RemoteInvokeOutputFormat):
self._sqs_client = sqs_client
self._remote_output_format = remote_output_format
self._queue_url = physical_id
self.request_parameters = {}
def validate_action_parameters(self, parameters: dict) -> None:
"""
Validates the input boto parameters and prepares the parameters for calling the API.
Parameters
----------
parameters: dict
Boto parameters provided as input
"""
try:
for parameter_key, parameter_value in parameters.items():
if parameter_key == QUEUE_URL:
LOG.warning("QueueUrl is defined using the value provided for resource_id argument.")
elif parameter_key == MESSAGE_BODY:
LOG.warning(
"MessageBody is defined using the value provided for either --event or --event-file options."
)
elif parameter_key == DELAY_SECONDS:
self.request_parameters[parameter_key] = int(parameter_value)
elif parameter_key in {MESSAGE_ATTRIBUTES, MESSAGE_SYSTEM_ATTRIBUTES}:
# Load these parameters as dict which is the format required for the send_message boto3 API
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sqs/client/send_message.html
self.request_parameters[parameter_key] = json.loads(parameter_value)
else:
self.request_parameters[parameter_key] = parameter_value
except (ValueError, JSONDecodeError) as err:
raise InvalidResourceBotoParameterException(f"Invalid value provided for parameter {parameter_key}", err)
def _execute_action(self, payload: str) -> RemoteInvokeIterableResponseType:
"""
Calls "send_message" method to send a message to the SQS queue.
Parameters
----------
payload: str
The MessageBody which will be sent to the SQS
Yields
------
RemoteInvokeIterableResponseType
Response that is consumed by remote invoke consumers after execution
"""
if payload:
self.request_parameters[MESSAGE_BODY] = payload
else:
self.request_parameters[MESSAGE_BODY] = "{}"
LOG.debug("Input event not found, sending a message with MessageBody {}")
self.request_parameters[QUEUE_URL] = self._queue_url
LOG.debug(
"Calling sqs_client.send_message with QueueUrl:%s, MessageBody:%s",
self.request_parameters[QUEUE_URL],
self.request_parameters[MESSAGE_BODY],
)
try:
send_message_response = cast(dict, self._sqs_client.send_message(**self.request_parameters))
if self._remote_output_format == RemoteInvokeOutputFormat.JSON:
yield RemoteInvokeResponse(send_message_response)
if self._remote_output_format == RemoteInvokeOutputFormat.TEXT:
# Create an object with MD5OfMessageBody and MessageId fields, and write to stdout
send_message_text_output = SqsSendMessageTextOutput(
MD5OfMessageBody=send_message_response["MD5OfMessageBody"],
MessageId=send_message_response["MessageId"],
MD5OfMessageAttributes=send_message_response.get("MD5OfMessageAttributes"),
)
output_data = send_message_text_output.get_output_response_dict()
yield RemoteInvokeResponse(output_data)
except ParamValidationError as param_val_ex:
raise InvalidResourceBotoParameterException(
f"Invalid parameter key provided."
f" {str(param_val_ex).replace(f'{QUEUE_URL}, ', '').replace(f'{MESSAGE_BODY}, ', '')}"
)
except ClientError as client_ex:
raise ErrorBotoApiCallException(client_ex) from client_ex
def get_queue_url_from_arn(sqs_client: SQSClient, queue_name: str) -> str:
"""
This function gets the queue url of the provided SQS queue name
Parameters
----------
sqs_client: SQSClient
SQS client to call boto3 APIs
queue_name: str
Name of SQS queue used to get the queue_url
Returns
-------
str
Returns the SQS queue url
"""
try:
output_response = sqs_client.get_queue_url(QueueName=queue_name)
queue_url = cast(str, output_response.get(QUEUE_URL, ""))
return queue_url
except ClientError as client_ex:
LOG.debug("Failed to get queue_url using the provided SQS Arn")
raise ErrorBotoApiCallException(client_ex) from client_ex