samcli/lib/remote_invoke/remote_invoke_executors.py (139 lines of code) (raw):
"""
Abstract class definitions and generic implementations for remote invoke
"""
import json
import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from io import TextIOWrapper
from pathlib import Path
from typing import Any, Callable, Generic, Iterable, List, Optional, TypeVar, Union, cast
from typing_extensions import TypeAlias
LOG = logging.getLogger(__name__)
@dataclass
class RemoteInvokeResponse:
"""
Dataclass that contains response object of the remote invoke execution.
dict for raw events, str for other ones
"""
response: Union[str, dict]
@dataclass
class RemoteInvokeLogOutput:
"""
Dataclass that contains log objects of the remote invoke execution
"""
log_output: str
# type alias to keep consistency between different places for remote invoke return type
RemoteInvokeIterableResponseType: TypeAlias = Iterable[Union[RemoteInvokeResponse, RemoteInvokeLogOutput]]
class RemoteInvokeOutputFormat(Enum):
"""
Types of output formats used to by remote invoke
"""
TEXT = "text"
JSON = "json"
class RemoteInvokeEventType(Enum):
"""
Types of events to call remote invoke
"""
FILE = "file"
NO_EVENT = "no_event"
REMOTE_EVENT = "remote_event"
TEXT = "text"
@classmethod
def get_event_type(
cls,
event: Optional[str] = None,
event_file: Optional[TextIOWrapper] = None,
test_event_name: Optional[str] = None,
) -> str:
if test_event_name:
return cls.REMOTE_EVENT.value
if event_file:
return cls.FILE.value
if event:
return cls.TEXT.value
return cls.NO_EVENT.value
@classmethod
def get_possible_values(cls):
return [e.value for e in cls]
class RemoteInvokeExecutionInfo:
"""
Keeps request and response information about remote invoke execution
payload: payload string given by the customer
payload_file: if file is given, this points to its location
response: response object returned from boto3 action
exception: if an exception is thrown, it will be stored here
"""
# Request related properties
payload: Optional[Union[str, List, dict]]
payload_file: Optional[TextIOWrapper]
parameters: dict
output_format: RemoteInvokeOutputFormat
# Response related properties
response: Optional[Union[dict, str]]
log_output: Optional[str]
exception: Optional[Exception]
def __init__(
self,
payload: Optional[Union[str, List, dict]],
payload_file: Optional[TextIOWrapper],
parameters: dict,
output_format: RemoteInvokeOutputFormat,
):
self.payload = payload
self.payload_file = payload_file
self.parameters = parameters
self.output_format = output_format
self.response = None
self.log_output = None
self.exception = None
def is_file_provided(self) -> bool:
return bool(self.payload_file)
@property
def payload_file_path(self) -> Optional[TextIOWrapper]:
return self.payload_file if self.is_file_provided() else None
def is_succeeded(self) -> bool:
return bool(self.response)
RemoteInvokeResponseType = TypeVar("RemoteInvokeResponseType")
class RemoteInvokeRequestResponseMapper(Generic[RemoteInvokeResponseType]):
"""
Mapper definition which can be used map remote invoke requests or responses.
For instance, if a string provided where JSON is required, a mapper can convert given string
into JSON object for request.
Or for a response object, if it contains streaming results, a mapper can convert them back
to string to display on
"""
@abstractmethod
def map(self, remote_invoke_input: RemoteInvokeResponseType) -> RemoteInvokeResponseType:
raise NotImplementedError()
class RemoteInvokeConsumer(Generic[RemoteInvokeResponseType]):
@abstractmethod
def consume(self, remote_invoke_response: RemoteInvokeResponseType) -> None:
raise NotImplementedError()
class ResponseObjectToJsonStringMapper(RemoteInvokeRequestResponseMapper):
"""
Maps response object inside RemoteInvokeExecutionInfo into formatted JSON string with multiple lines
"""
def map(self, remote_invoke_input: RemoteInvokeResponse) -> RemoteInvokeResponse:
LOG.debug("Converting response object into JSON")
remote_invoke_input.response = json.dumps(remote_invoke_input.response, indent=2)
return remote_invoke_input
class BotoActionExecutor(ABC):
"""
Executes a specific boto3 service action and updates the response of the RemoteInvokeExecutionInfo object
If execution throws an exception, it updates the exception information as well
"""
@abstractmethod
def _execute_action(self, payload: str) -> RemoteInvokeIterableResponseType:
"""
Specific boto3 API call implementation.
Parameters
----------
payload : str
Payload object that is been provided
Returns
-------
Response dictionary from the API call
"""
raise NotImplementedError()
@abstractmethod
def validate_action_parameters(self, parameters: dict):
"""
Validates the input boto3 parameters before calling the API
:param parameters: Boto parameters passed as input
"""
raise NotImplementedError()
def _execute_action_file(self, payload_file: TextIOWrapper) -> RemoteInvokeIterableResponseType:
"""
Different implementation which is specific to a file path. Some boto3 APIs may accept a file path
rather than a string. This implementation targets these options to support different file types
other than just string.
Default implementation reads the file contents as string and calls execute_action.
Parameters
----------
payload_file : Path
Location of the payload file
Returns
-------
Response dictionary from the API call
"""
return self._execute_action(payload_file.read())
def execute(self, remote_invoke_input: RemoteInvokeExecutionInfo) -> RemoteInvokeIterableResponseType:
"""
Executes boto3 API and updates response or exception object depending on the result
Parameters
----------
remote_invoke_input : RemoteInvokeExecutionInfo
Remote execution details which contains payload or payload file information
Returns
-------
RemoteInvokeIterableResponseType
Returns iterable response, see response type definition for details
"""
action_executor: Callable[[Any], Iterable[Union[RemoteInvokeResponse, RemoteInvokeLogOutput]]]
payload: Union[str, Path]
# if a file pointed is provided for payload, use specific payload and its function here
if remote_invoke_input.is_file_provided():
action_executor = self._execute_action_file
payload = cast(Path, remote_invoke_input.payload_file_path)
else:
action_executor = self._execute_action
payload = cast(str, remote_invoke_input.payload)
# execute boto3 API, and update result if it is successful, update exception otherwise
return action_executor(payload)
class RemoteInvokeExecutor:
"""
Generic RemoteInvokeExecutor, which contains request mappers, response mappers and boto action executor.
Input is been updated with the given list of request mappers.
Then updated input have been passed to boto action executor to actually call the API
Once the result is returned, if it is successful, response have been mapped with list of response mappers
"""
_request_mappers: List[RemoteInvokeRequestResponseMapper[RemoteInvokeExecutionInfo]]
_response_mappers: List[RemoteInvokeRequestResponseMapper[RemoteInvokeResponse]]
_boto_action_executor: BotoActionExecutor
_response_consumer: RemoteInvokeConsumer[RemoteInvokeResponse]
_log_consumer: RemoteInvokeConsumer[RemoteInvokeLogOutput]
def __init__(
self,
request_mappers: List[RemoteInvokeRequestResponseMapper[RemoteInvokeExecutionInfo]],
response_mappers: List[RemoteInvokeRequestResponseMapper[RemoteInvokeResponse]],
boto_action_executor: BotoActionExecutor,
response_consumer: RemoteInvokeConsumer[RemoteInvokeResponse],
log_consumer: RemoteInvokeConsumer[RemoteInvokeLogOutput],
):
self._request_mappers = request_mappers
self._response_mappers = response_mappers
self._boto_action_executor = boto_action_executor
self._response_consumer = response_consumer
self._log_consumer = log_consumer
def execute(self, remote_invoke_input: RemoteInvokeExecutionInfo) -> None:
"""
First runs all mappers for request object to get the final version of it.
Then validates all the input boto parameters and invokes the BotoActionExecutor to get the result
And finally, runs all mappers for the response object to get the final form of it.
"""
remote_invoke_input = self._map_input(remote_invoke_input)
self._boto_action_executor.validate_action_parameters(remote_invoke_input.parameters)
for remote_invoke_result in self._boto_action_executor.execute(remote_invoke_input):
if isinstance(remote_invoke_result, RemoteInvokeResponse):
self._response_consumer.consume(self._map_output(remote_invoke_result))
if isinstance(remote_invoke_result, RemoteInvokeLogOutput):
self._log_consumer.consume(remote_invoke_result)
def _map_input(self, remote_invoke_input: RemoteInvokeExecutionInfo) -> RemoteInvokeExecutionInfo:
"""
Maps the given input through the request mapper list.
Parameters
----------
remote_invoke_input : RemoteInvokeExecutionInfo
Given remote invoke execution info which contains the request information
Returns
-------
RemoteInvokeExecutionInfo
RemoteInvokeExecutionInfo which contains updated input payload
"""
for input_mapper in self._request_mappers:
remote_invoke_input = input_mapper.map(remote_invoke_input)
return remote_invoke_input
def _map_output(self, remote_invoke_output: RemoteInvokeResponse) -> RemoteInvokeResponse:
"""
Maps the given response through the response mapper list.
Parameters
----------
remote_invoke_output : RemoteInvokeResponse
Given remote invoke response which contains the payload itself
Returns
-------
RemoteInvokeResponse
Returns the mapped instance of RemoteInvokeResponse, after applying all configured mappers
"""
for output_mapper in self._response_mappers:
remote_invoke_output = output_mapper.map(remote_invoke_output)
return remote_invoke_output