samcli/lib/remote_invoke/kinesis_invoke_executors.py (76 lines of code) (raw):
"""
Remote invoke executor implementation for Kinesis streams
"""
import logging
import uuid
from dataclasses import asdict, dataclass
from typing import cast
from botocore.exceptions import ClientError, ParamValidationError
from mypy_boto3_kinesis import KinesisClient
from samcli.lib.remote_invoke.exceptions import (
ErrorBotoApiCallException,
InvalidResourceBotoParameterException,
)
from samcli.lib.remote_invoke.remote_invoke_executors import (
BotoActionExecutor,
RemoteInvokeIterableResponseType,
RemoteInvokeOutputFormat,
RemoteInvokeResponse,
)
LOG = logging.getLogger(__name__)
STREAM_NAME = "StreamName"
DATA = "Data"
PARTITION_KEY = "PartitionKey"
@dataclass
class KinesisStreamPutRecordTextOutput:
"""
Dataclass that stores put_record boto3 API fields used to create
text output.
"""
ShardId: str
SequenceNumber: str
def get_output_response_dict(self) -> dict:
"""
Returns a dict of existing dataclass fields.
Returns
-------
dict
Returns the dict of the fields that will be used as the output response for
text format output.
"""
return asdict(self, dict_factory=lambda x: {k: v for (k, v) in x if v is not None})
class KinesisPutDataExecutor(BotoActionExecutor):
"""
Calls "put_record" method of "Kinesis stream" service with given input.
If a file location provided, the file handle will be passed as input object.
"""
_kinesis_client: KinesisClient
_stream_name: str
_remote_output_format: RemoteInvokeOutputFormat
request_parameters: dict
def __init__(self, kinesis_client: KinesisClient, physical_id: str, remote_output_format: RemoteInvokeOutputFormat):
self._kinesis_client = kinesis_client
self._remote_output_format = remote_output_format
self._stream_name = physical_id
self.request_parameters = {}
def validate_action_parameters(self, parameters: dict) -> None:
"""
Validates the input boto parameters and prepares the parameters for calling the API.
Parameters
----------
parameters: dict
Boto parameters provided as input
"""
for parameter_key, parameter_value in parameters.items():
if parameter_key == STREAM_NAME:
LOG.warning("StreamName is defined using the value provided for resource_id argument.")
elif parameter_key == DATA:
LOG.warning("Data is defined using the value provided for either --event or --event-file options.")
else:
self.request_parameters[parameter_key] = parameter_value
if PARTITION_KEY not in self.request_parameters:
self.request_parameters[PARTITION_KEY] = str(uuid.uuid4())
def _execute_action(self, payload: str) -> RemoteInvokeIterableResponseType:
"""
Calls "put_record" method to write single data record to Kinesis data stream.
Parameters
----------
payload: str
The Data record which will be sent to the Kinesis stream
Yields
------
RemoteInvokeIterableResponseType
Response that is consumed by remote invoke consumers after execution
"""
if payload:
self.request_parameters[DATA] = payload
else:
self.request_parameters[DATA] = "{}"
LOG.debug("Input event not found, putting a record with Data {}")
self.request_parameters[STREAM_NAME] = self._stream_name
LOG.debug(
"Calling kinesis_client.put_record with StreamName:%s, Data:%s",
self.request_parameters[STREAM_NAME],
self.request_parameters[DATA],
)
try:
put_record_response = cast(dict, self._kinesis_client.put_record(**self.request_parameters))
if self._remote_output_format == RemoteInvokeOutputFormat.JSON:
yield RemoteInvokeResponse(put_record_response)
if self._remote_output_format == RemoteInvokeOutputFormat.TEXT:
put_record_text_output = KinesisStreamPutRecordTextOutput(
ShardId=put_record_response["ShardId"],
SequenceNumber=put_record_response["SequenceNumber"],
)
output_data = put_record_text_output.get_output_response_dict()
yield RemoteInvokeResponse(output_data)
except ParamValidationError as param_val_ex:
raise InvalidResourceBotoParameterException(
f"Invalid parameter key provided."
f" {str(param_val_ex).replace(f'{STREAM_NAME}, ', '').replace(f'{DATA}, ', '')}"
)
except ClientError as client_ex:
raise ErrorBotoApiCallException(client_ex) from client_ex