samcli/lib/shared_test_events/lambda_shared_test_event.py (221 lines of code) (raw):
"""For operations specific to Lambda shared test event schemas"""
import json
import logging
from json import JSONDecodeError
from typing import Any
import botocore
from samcli.commands.exceptions import SchemasApiException
from samcli.commands.local.cli_common.user_exceptions import ResourceNotFound, SchemaPermissionsError
from samcli.commands.remote.exceptions import (
DuplicateEventName,
EventTooLarge,
InvalidSchema,
ResourceNotSupportedForTestEvents,
)
from samcli.lib.schemas.schemas_api_caller import SchemasApiCaller
from samcli.lib.utils.cloudformation import CloudFormationResourceSummary
from samcli.lib.utils.resources import AWS_LAMBDA_FUNCTION
LAMBDA_TEST_EVENT_REGISTRY = "lambda-testevent-schemas"
MIN_EVENTS = 1
MAX_SCHEMA_VERSIONS = 5
# 6MB Lambda invocation payload limit
MAX_EVENT_SIZE = 6144000
OPEN_API_TYPE = "OpenApi3"
SCHEMA_PERMISSIONS_ERROR = """
You don't have the neccesary permissions to create shareable test events.
Update your role to have the necessary permissions or change your event sharing settings to private.
Learn more: https://docs.aws.amazon.com/lambda/latest/dg/testing-functions.html#creating-shareable-events
"""
LOG = logging.getLogger(__name__)
class NoPermissionExceptionWrapper(object):
"""
Class that wraps an ApiCaller object, to catch a "ForbiddenException"
and throw our own exception with a custom message in that case
"""
def __init__(self, api: SchemasApiCaller):
self.api: SchemasApiCaller = api
def __getattr__(self, attr):
def wrapper(*args, **kwargs):
try:
return getattr(self.api, attr)(*args, **kwargs)
except botocore.exceptions.ClientError as ex:
if ex.response.get("Error", {}).get("Code") == "ForbiddenException":
raise SchemaPermissionsError(SCHEMA_PERMISSIONS_ERROR)
raise
return wrapper
@classmethod
def wrap(cls, api: SchemasApiCaller) -> SchemasApiCaller:
# The wrapper behaves effectively like the SchemasApiCaller it's wrapping
wrapper: SchemasApiCaller = NoPermissionExceptionWrapper(api) # type: ignore
return wrapper
class LambdaSharedTestEvent:
def __init__(self, schema_api_caller: SchemasApiCaller, lambda_client):
self._api_caller: SchemasApiCaller = NoPermissionExceptionWrapper.wrap(schema_api_caller)
self._lambda_client = lambda_client
def _validate_schema_dict(self, schema_dict: dict):
# For our purposes, the only field we care exists is "components"
# All other parts of the schema are generated externally, so we do not verify these
if "components" not in schema_dict:
raise InvalidSchema(f"Schema {json.dumps(schema_dict)} is not valid")
def _validate_event_size(self, event: str):
"""
Raises an EventTooLarge exception if the event is bigger than accepted
Parameters
----------
event : str
Contents of the event to validate
Raises
------
EventTooLarge
When the event is bigger than the accepted Lambda invocation payload size
"""
if len(event.encode("utf8")) > MAX_EVENT_SIZE:
raise EventTooLarge(
"Event is bigger than the accepted Lambda invocation payload size. "
+ "Learn more at https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html#function-configuration-deployment-and-execution"
)
def _get_schema_name(self, function_resource: CloudFormationResourceSummary) -> str:
"""
Get the schema name for a specific function according to the Lambda convention
Parameters
----------
function_resource : CloudFormationResourceSummary
The function resource
"""
if function_resource.resource_type != AWS_LAMBDA_FUNCTION:
raise ResourceNotSupportedForTestEvents(
f"Resource type {function_resource.resource_type} is not supported for remote test events."
)
function_name = function_resource.physical_resource_id
if function_name.startswith("arn:"):
# If it's an ARN, we check that the function exists and get its name.
try:
function_config = self._lambda_client.get_function_configuration(FunctionName=function_name)
function_name = function_config["FunctionName"]
except botocore.exceptions.ClientError as ex:
if ex.response.get("Error", {}).get("Code") == "ResourceNotFoundException":
raise ResourceNotFound(f"Function not found when trying to read events: {function_name}")
raise
return f"_{function_name}-schema"
def limit_versions(self, schema_name: str, registry_name: str = LAMBDA_TEST_EVENT_REGISTRY):
"""
Delete the oldest existing version if there are more than MAX_SCHEMA_VERSIONS.
Parameters
----------
schema_name : str
Schema to limit versions
registry_name : str
Registry name
"""
schema_version_list = self._api_caller.list_schema_versions(registry_name, schema_name)
if len(schema_version_list) >= MAX_SCHEMA_VERSIONS:
version_to_delete = schema_version_list[0] # Oldest version
LOG.debug(
"Over %s versions, deleting version %s",
MAX_SCHEMA_VERSIONS,
version_to_delete,
)
self._api_caller.delete_version(registry_name, schema_name, version_to_delete)
def add_event_to_schema(self, schema: str, event: str, event_name: str, replace_if_exists: bool = False) -> str:
"""
Adds an event to a schema
Parameters
---------
schema:
The schema the event will be added to
event:
The event that will be added to the schema
event_name:
The name of the event that will be added to the schema
Returns
-------
The updated schema with the event added to the "examples" section
"""
try:
self._validate_event_size(event)
schema_dict = json.loads(schema)
self._validate_schema_dict(schema_dict)
event_dict = json.loads(event)
# this occurs when we freshly generate a schema
if "examples" not in schema_dict["components"]:
schema_dict["components"]["examples"] = {}
if event_name in schema_dict["components"]["examples"] and not replace_if_exists:
raise DuplicateEventName(f"Event {event_name} already exists. You can replace it with `--force`.")
schema_dict["components"]["examples"][event_name] = {
"value": event_dict,
}
schema = json.dumps(schema_dict)
return schema
except JSONDecodeError as ex:
LOG.error("Error parsing schema when adding event %s", event_name)
raise SchemasApiException(
"Parse error reading the content from Schemas response. "
"This should not be possible, please raise an issue."
) from ex
def get_event_from_schema(self, schema: str, event_name: str) -> Any:
"""
Gets the specified event from the provided schema
Parameters
----------
schema:
the schema string that contains the event
event_name:
the name of the event to retrieve from the schema
Returns
-------
The event data of the event "event_name"
"""
try:
schema_dict = json.loads(schema)
self._validate_schema_dict(schema_dict)
existing_events = schema_dict["components"].get("examples", {})
if event_name not in existing_events or "value" not in existing_events[event_name]:
raise ResourceNotFound(f"Event {event_name} not found")
# can use square brackets here since we know each of these subdicts exist
return existing_events[event_name]["value"]
except JSONDecodeError as ex:
LOG.error("Error decoding schema when getting event %s", event_name)
raise SchemasApiException(
"Parse error reading the content from Schemas response. "
"This should not be possible, please raise an issue."
) from ex
def remove_event(self, schema: str, event_name: str) -> str:
"""
Removes an event from a schema dict. If there are none, returns None
Parameters
----------
schema:
The schema the event will be removed from
event_name:
The name of the event that will be removed from the schema
Returns
-------
The updated schema with the event removed, or None of the schema does not contain the event
"""
try:
schema_dict = json.loads(schema)
self._validate_schema_dict(schema_dict)
if event_name not in schema_dict["components"].get("examples", {}):
raise ResourceNotFound(f"Event {event_name} not found")
if len(schema_dict["components"]["examples"]) <= MIN_EVENTS:
LOG.debug("event %s is only event in schema, entire schema will be deleted", event_name)
return ""
schema_dict["components"]["examples"].pop(event_name)
schema = json.dumps(schema_dict)
return schema
except JSONDecodeError as ex:
LOG.error("Error parsing schema while removing event %s", event_name)
raise SchemasApiException(
"Parse error reading the content from Schemas response. "
"This should not be possible, please raise an issue."
) from ex
def create_event(
self,
event_name: str,
function_resource: CloudFormationResourceSummary,
event_data: str,
force: bool = False,
registry_name: str = LAMBDA_TEST_EVENT_REGISTRY,
) -> str:
"""
Generates a new event and adds it to the EBSR
Parameters
----------
event_name: str
The name of the event to be created
function_resource: CloudFormationResourceSummary
The function where the event will be created
event_data: str
The JSON data of the event to be created
registry_name: str
The name of the registry that contains the schema
"""
api_caller = self._api_caller
if not api_caller.check_registry_exists(registry_name):
api_caller.create_registry(registry_name)
schema_name = self._get_schema_name(function_resource)
original_schema = api_caller.get_schema(registry_name, schema_name)
if original_schema:
LOG.debug("Schema %s already exists, adding event %s", schema_name, event_name)
self.limit_versions(schema_name, registry_name)
schema = self.add_event_to_schema(original_schema, event_data, event_name, replace_if_exists=force)
api_caller.update_schema(schema, registry_name, schema_name, OPEN_API_TYPE)
else:
LOG.debug("Schema %s does not already exist, creating new", schema_name)
schema = api_caller.discover_schema(event_data, OPEN_API_TYPE)
schema = self.add_event_to_schema(schema, event_data, event_name)
api_caller.create_schema(schema, registry_name, schema_name, OPEN_API_TYPE)
return schema
def delete_event(
self,
event_name: str,
function_resource: CloudFormationResourceSummary,
registry_name: str = LAMBDA_TEST_EVENT_REGISTRY,
):
"""
Deletes a remote test event (and the schema if it contains only one event)
Parameters
----------
event_name: str
The name of the event to be deleted
function_resource: CloudFormationResourceSummary
The function that will have the event deleted
registry_name: str
The name of the registry that contains the schema
"""
api_caller = self._api_caller
if not api_caller.check_registry_exists(registry_name):
raise ResourceNotFound(f"{registry_name} registry not found. There are no saved events.")
schema_name = self._get_schema_name(function_resource)
schema = api_caller.get_schema(registry_name, schema_name)
function_name = function_resource.logical_resource_id
if not schema:
raise ResourceNotFound(f"No events found for function {function_name}")
schema = self.remove_event(schema, event_name)
if not schema:
LOG.debug("Only one event in schema %s, deleting schema for function %s", schema_name, function_name)
api_caller.delete_schema(registry_name, schema_name)
else:
LOG.debug("Multiple events in schema %s, updating schema for function %s", schema_name, function_name)
self.limit_versions(schema_name, registry_name)
api_caller.update_schema(schema, registry_name, schema_name, OPEN_API_TYPE)
def get_event(
self,
event_name: str,
function_resource: CloudFormationResourceSummary,
registry_name: str = LAMBDA_TEST_EVENT_REGISTRY,
) -> str:
"""
Returns a remote test event
Parameters
----------
event_name: str
The name of the event to be fetched
function_resource: CloudFormationResourceSummary
The function that has the event
registry_name: str
The name of the registry that contains the schema
Returns
-------
The JSON data of the event
"""
api_caller = self._api_caller
if not api_caller.check_registry_exists(registry_name):
raise ResourceNotFound(f"{registry_name} registry not found. There are no saved events.")
schema_name = self._get_schema_name(function_resource)
schema = api_caller.get_schema(registry_name, schema_name)
function_name = function_resource.logical_resource_id
if not schema:
raise ResourceNotFound(f"No events found for function {function_name}")
event = self.get_event_from_schema(schema, event_name)
try:
return json.dumps(event)
except JSONDecodeError as ex:
LOG.error("Error parsing event %s from schema %s", event_name, schema_name)
raise SchemasApiException(
"Parse error reading the content from Schemas response. "
"This should not be possible, please raise an issue."
) from ex
def list_events(
self,
function_resource: CloudFormationResourceSummary,
registry_name: str = LAMBDA_TEST_EVENT_REGISTRY,
) -> str:
"""_summary_
Parameters
----------
function_resource : CloudFormationResourceSummary
The function to list the test events from
registry_name : str, optional
Registry name, by default LAMBDA_TEST_EVENT_REGISTRY
Returns
-------
str
Function's event names, separated by a new line
"""
api_caller = self._api_caller
if not api_caller.check_registry_exists(registry_name):
raise ResourceNotFound(f"{registry_name} registry not found. There are no saved events.")
schema_name = self._get_schema_name(function_resource)
schema = api_caller.get_schema(registry_name, schema_name)
function_name = function_resource.logical_resource_id
if not schema:
raise ResourceNotFound(f"No events found for function {function_name}")
try:
schema_dict = json.loads(schema)
return "\n".join(schema_dict.get("components", {}).get("examples", {}).keys())
except JSONDecodeError as ex:
LOG.error("Error parsing schema %s", schema_name)
raise SchemasApiException(
"Parse error reading the content from Schemas response. "
"This should not be possible, please raise an issue."
) from ex