samcli/lib/remote_invoke/lambda_invoke_executors.py (163 lines of code) (raw):
"""
Remote invoke executor implementation for Lambda
"""
import base64
import json
import logging
from abc import ABC, abstractmethod
from json import JSONDecodeError
from typing import cast
from botocore.eventstream import EventStream
from botocore.exceptions import ClientError, ParamValidationError
from botocore.response import StreamingBody
from mypy_boto3_lambda.client import LambdaClient
from samcli.lib.remote_invoke.exceptions import (
ErrorBotoApiCallException,
InvalideBotoResponseException,
InvalidResourceBotoParameterException,
)
from samcli.lib.remote_invoke.remote_invoke_executors import (
BotoActionExecutor,
RemoteInvokeExecutionInfo,
RemoteInvokeIterableResponseType,
RemoteInvokeLogOutput,
RemoteInvokeOutputFormat,
RemoteInvokeRequestResponseMapper,
RemoteInvokeResponse,
)
from samcli.lib.utils import boto_utils
LOG = logging.getLogger(__name__)
FUNCTION_NAME = "FunctionName"
PAYLOAD = "Payload"
EVENT_STREAM = "EventStream"
PAYLOAD_CHUNK = "PayloadChunk"
INVOKE_COMPLETE = "InvokeComplete"
LOG_RESULT = "LogResult"
INVOKE_MODE = "InvokeMode"
RESPONSE_STREAM = "RESPONSE_STREAM"
class AbstractLambdaInvokeExecutor(BotoActionExecutor, ABC):
"""
Abstract class for different lambda invocation executors, see implementation for details.
For Payload parameter, if a file location provided, the file handle will be passed as Payload object
"""
_lambda_client: LambdaClient
_function_name: str
_remote_output_format: RemoteInvokeOutputFormat
def __init__(self, lambda_client: LambdaClient, function_name: str, remote_output_format: RemoteInvokeOutputFormat):
self._lambda_client = lambda_client
self._function_name = function_name
self._remote_output_format = remote_output_format
self.request_parameters = {"InvocationType": "RequestResponse", "LogType": "Tail"}
def validate_action_parameters(self, parameters: dict) -> None:
"""
Validates the input boto parameters and prepares the parameters for calling the API.
Parameters
----------
parameters: dict
Boto parameters provided as input
"""
for parameter_key, parameter_value in parameters.items():
if parameter_key == FUNCTION_NAME:
LOG.warning("FunctionName is defined using the value provided for resource_id argument.")
elif parameter_key == PAYLOAD:
LOG.warning("Payload is defined using the value provided for either --event or --event-file options.")
else:
self.request_parameters[parameter_key] = parameter_value
def _execute_action(self, payload: str) -> RemoteInvokeIterableResponseType:
self.request_parameters[FUNCTION_NAME] = self._function_name
self.request_parameters[PAYLOAD] = payload
return self._execute_lambda_invoke(payload)
def _execute_boto_call(self, boto_client_method) -> dict:
try:
return cast(dict, boto_client_method(**self.request_parameters))
except ParamValidationError as param_val_ex:
raise InvalidResourceBotoParameterException(
f"Invalid parameter key provided."
f" {str(param_val_ex).replace(f'{FUNCTION_NAME}, ', '').replace(f'{PAYLOAD}, ', '')}"
) from param_val_ex
except ClientError as client_ex:
if boto_utils.get_client_error_code(client_ex) == "ValidationException":
raise InvalidResourceBotoParameterException(
f"Invalid parameter value provided. {str(client_ex).replace('(ValidationException) ', '')}"
) from client_ex
elif boto_utils.get_client_error_code(client_ex) == "InvalidRequestContentException":
raise InvalidResourceBotoParameterException(client_ex) from client_ex
raise ErrorBotoApiCallException(client_ex) from client_ex
@abstractmethod
def _execute_lambda_invoke(self, payload: str) -> RemoteInvokeIterableResponseType:
raise NotImplementedError()
class LambdaInvokeExecutor(AbstractLambdaInvokeExecutor):
"""
Calls "invoke" method of "lambda" service with given input.
"""
def _execute_lambda_invoke(self, payload: str) -> RemoteInvokeIterableResponseType:
LOG.debug(
"Calling lambda_client.invoke with FunctionName:%s, Payload:%s, parameters:%s",
self._function_name,
payload,
self.request_parameters,
)
lambda_response = self._execute_boto_call(self._lambda_client.invoke)
if self._remote_output_format == RemoteInvokeOutputFormat.JSON:
yield RemoteInvokeResponse(lambda_response)
if self._remote_output_format == RemoteInvokeOutputFormat.TEXT:
log_result = lambda_response.get(LOG_RESULT)
if log_result:
yield RemoteInvokeLogOutput(base64.b64decode(log_result).decode("utf-8"))
yield RemoteInvokeResponse(cast(StreamingBody, lambda_response.get(PAYLOAD)).read().decode("utf-8"))
class LambdaInvokeWithResponseStreamExecutor(AbstractLambdaInvokeExecutor):
"""
Calls "invoke_with_response_stream" method of "lambda" service with given input.
"""
def _execute_lambda_invoke(self, payload: str) -> RemoteInvokeIterableResponseType:
LOG.debug(
"Calling lambda_client.invoke_with_response_stream with FunctionName:%s, Payload:%s, parameters:%s",
self._function_name,
payload,
self.request_parameters,
)
lambda_response = self._execute_boto_call(self._lambda_client.invoke_with_response_stream)
if self._remote_output_format == RemoteInvokeOutputFormat.JSON:
yield RemoteInvokeResponse(lambda_response)
if self._remote_output_format == RemoteInvokeOutputFormat.TEXT:
event_stream: EventStream = lambda_response.get(EVENT_STREAM, [])
for event in event_stream:
if PAYLOAD_CHUNK in event:
yield RemoteInvokeResponse(event.get(PAYLOAD_CHUNK).get(PAYLOAD).decode("utf-8"))
if INVOKE_COMPLETE in event:
if LOG_RESULT in event.get(INVOKE_COMPLETE):
yield RemoteInvokeLogOutput(
base64.b64decode(event.get(INVOKE_COMPLETE).get(LOG_RESULT)).decode("utf-8")
)
class DefaultConvertToJSON(RemoteInvokeRequestResponseMapper[RemoteInvokeExecutionInfo]):
"""
If a regular string is provided as payload, this class will convert it into a JSON object
"""
def map(self, test_input: RemoteInvokeExecutionInfo) -> RemoteInvokeExecutionInfo:
if not test_input.is_file_provided():
if not test_input.payload:
LOG.debug("Input event not found, invoking resource with an empty event")
test_input.payload = "{}"
LOG.debug("Mapping input event to JSON string object")
try:
_ = json.loads(cast(str, test_input.payload))
except JSONDecodeError:
json_value = json.dumps(test_input.payload)
LOG.info(
"Auto converting value '%s' into JSON '%s'. "
"If you don't want auto-conversion, please provide a JSON string as event",
test_input.payload,
json_value,
)
test_input.payload = json_value
return test_input
class LambdaResponseConverter(RemoteInvokeRequestResponseMapper[RemoteInvokeResponse]):
"""
This class helps to convert response from lambda service. Normally lambda service
returns 'Payload' field as stream, this class converts that stream into string object
"""
def map(self, remote_invoke_input: RemoteInvokeResponse) -> RemoteInvokeResponse:
LOG.debug("Mapping Lambda response to string object")
if not isinstance(remote_invoke_input.response, dict):
raise InvalideBotoResponseException("Invalid response type received from Lambda service, expecting dict")
payload_field = remote_invoke_input.response.get(PAYLOAD)
if payload_field:
remote_invoke_input.response[PAYLOAD] = cast(StreamingBody, payload_field).read().decode("utf-8")
return remote_invoke_input
class LambdaStreamResponseConverter(RemoteInvokeRequestResponseMapper):
"""
This class helps to convert response from lambda invoke_with_response_stream API call.
That API call returns 'EventStream' which yields 'PayloadChunk's and 'InvokeComplete' as they become available.
This mapper, gets all 'PayloadChunk's and 'InvokeComplete' events and decodes them for next mapper.
"""
def map(self, remote_invoke_input: RemoteInvokeResponse) -> RemoteInvokeResponse:
LOG.debug("Mapping Lambda response to string object")
if not isinstance(remote_invoke_input.response, dict):
raise InvalideBotoResponseException("Invalid response type received from Lambda service, expecting dict")
event_stream: EventStream = remote_invoke_input.response.get(EVENT_STREAM, [])
decoded_event_stream = []
for event in event_stream:
if PAYLOAD_CHUNK in event:
decoded_payload_chunk = event.get(PAYLOAD_CHUNK).get(PAYLOAD).decode("utf-8")
decoded_event_stream.append({PAYLOAD_CHUNK: {PAYLOAD: decoded_payload_chunk}})
if INVOKE_COMPLETE in event:
decoded_event_stream.append(event)
remote_invoke_input.response[EVENT_STREAM] = decoded_event_stream
return remote_invoke_input
def _is_function_invoke_mode_response_stream(lambda_client: LambdaClient, function_name: str):
"""
Returns True if given function has RESPONSE_STREAM as InvokeMode, False otherwise
"""
try:
function_url_config = lambda_client.get_function_url_config(FunctionName=function_name)
function_invoke_mode = function_url_config.get(INVOKE_MODE)
LOG.debug("InvokeMode of function %s: %s", function_name, function_invoke_mode)
return function_invoke_mode == RESPONSE_STREAM
except ClientError as ex:
LOG.debug("Function %s, doesn't have Function URL configured, using regular invoke", function_name, exc_info=ex)
return False