samtranslator/model/eventsources/pull.py (591 lines of code) (raw):
from abc import ABCMeta, abstractmethod
from typing import Any, Dict, List, Optional, Tuple
from samtranslator.internal.deprecation_control import deprecated
from samtranslator.metrics.method_decorator import cw_timer
from samtranslator.model import PassThroughProperty, Property, PropertyType, ResourceMacro
from samtranslator.model.eventsources import FUNCTION_EVETSOURCE_METRIC_PREFIX
from samtranslator.model.exceptions import InvalidEventException
from samtranslator.model.iam import IAMRolePolicies
from samtranslator.model.intrinsics import is_intrinsic
from samtranslator.model.lambda_ import LambdaEventSourceMapping
from samtranslator.model.types import IS_BOOL, IS_DICT, IS_INT, IS_LIST, IS_STR, PassThrough
from samtranslator.translator.arn_generator import ArnGenerator
from samtranslator.utils.types import Intrinsicable
from samtranslator.validator.value_validator import sam_expect
class PullEventSource(ResourceMacro, metaclass=ABCMeta):
"""Base class for pull event sources for SAM Functions.
The pull events are Kinesis Streams, DynamoDB Streams, Kafka Topics, Amazon MQ Queues, SQS Queues, and DocumentDB Clusters. All of these correspond to an
EventSourceMapping in Lambda, and require that the execution role be given to Kinesis Streams, DynamoDB
Streams, or SQS Queues, respectively.
:cvar str policy_arn: The ARN of the AWS managed role policy corresponding to this pull event source
"""
# Event types that support `FilterCriteria`, stored as a list to keep the alphabetical order
RESOURCE_TYPES_WITH_EVENT_FILTERING = ["DocumentDB", "DynamoDB", "Kinesis", "MQ", "MSK", "SelfManagedKafka", "SQS"]
# Note(xinhol): `PullEventSource` should have been an abstract class. Disabling the type check for the next
# line to avoid any potential behavior change.
# TODO: Make `PullEventSource` an abstract class and not giving `resource_type` initial value.
resource_type: str = None # type: ignore
relative_id: str # overriding the Optional[str]: for event, relative id is not None
property_types: Dict[str, PropertyType] = {
"BatchSize": PropertyType(False, IS_INT),
"StartingPosition": PassThroughProperty(False),
"StartingPositionTimestamp": PassThroughProperty(False),
"Enabled": PropertyType(False, IS_BOOL),
"MaximumBatchingWindowInSeconds": PropertyType(False, IS_INT),
"MaximumRetryAttempts": PropertyType(False, IS_INT),
"BisectBatchOnFunctionError": PropertyType(False, IS_BOOL),
"MaximumRecordAgeInSeconds": PropertyType(False, IS_INT),
"DestinationConfig": PropertyType(False, IS_DICT),
"ParallelizationFactor": PropertyType(False, IS_INT),
"Topics": PropertyType(False, IS_LIST),
"Queues": PropertyType(False, IS_LIST),
"SourceAccessConfigurations": PropertyType(False, IS_LIST),
"SecretsManagerKmsKeyId": PropertyType(False, IS_STR),
"TumblingWindowInSeconds": PropertyType(False, IS_INT),
"FunctionResponseTypes": PropertyType(False, IS_LIST),
"KafkaBootstrapServers": PropertyType(False, IS_LIST),
"FilterCriteria": PropertyType(False, IS_DICT),
"KmsKeyArn": PassThroughProperty(False),
"ConsumerGroupId": PropertyType(False, IS_STR),
"ScalingConfig": PropertyType(False, IS_DICT),
"ProvisionedPollerConfig": PropertyType(False, IS_DICT),
"MetricsConfig": PropertyType(False, IS_DICT),
}
BatchSize: Optional[Intrinsicable[int]]
StartingPosition: Optional[PassThrough]
StartingPositionTimestamp: Optional[PassThrough]
Enabled: Optional[bool]
MaximumBatchingWindowInSeconds: Optional[Intrinsicable[int]]
MaximumRetryAttempts: Optional[Intrinsicable[int]]
BisectBatchOnFunctionError: Optional[Intrinsicable[bool]]
MaximumRecordAgeInSeconds: Optional[Intrinsicable[int]]
DestinationConfig: Optional[Dict[str, Any]]
ParallelizationFactor: Optional[Intrinsicable[int]]
Topics: Optional[List[Any]]
Queues: Optional[List[Any]]
SourceAccessConfigurations: Optional[List[Any]]
SecretsManagerKmsKeyId: Optional[str]
TumblingWindowInSeconds: Optional[Intrinsicable[int]]
FunctionResponseTypes: Optional[List[Any]]
KafkaBootstrapServers: Optional[List[Any]]
FilterCriteria: Optional[Dict[str, Any]]
KmsKeyArn: Optional[Intrinsicable[str]]
ConsumerGroupId: Optional[Intrinsicable[str]]
ScalingConfig: Optional[Dict[str, Any]]
ProvisionedPollerConfig: Optional[Dict[str, Any]]
MetricsConfig: Optional[Dict[str, Any]]
@abstractmethod
def get_policy_arn(self) -> Optional[str]:
"""Policy to be added to the role (if a role applies)."""
@abstractmethod
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
"""Inline policy statements to be added to the role (if a role applies)."""
@abstractmethod
def get_event_source_arn(self) -> Optional[PassThrough]:
"""Return the value to assign to lambda event source mapping's EventSourceArn."""
def add_extra_eventsourcemapping_fields(self, _lambda_eventsourcemapping: LambdaEventSourceMapping) -> None:
"""Adds extra fields to the CloudFormation ESM resource.
This method can be overriden by a subclass if it has extra fields specific to that subclass.
:param LambdaEventSourceMapping lambda_eventsourcemapping: the Event source mapping resource to add the fields to.
"""
return
@cw_timer(prefix=FUNCTION_EVETSOURCE_METRIC_PREFIX)
def to_cloudformation(self, **kwargs): # type: ignore[no-untyped-def] # noqa: PLR0912, PLR0915
"""Returns the Lambda EventSourceMapping to which this pull event corresponds. Adds the appropriate managed
policy to the function's execution role, if such a role is provided.
:param dict kwargs: a dict containing the execution role generated for the function
:returns: a list of vanilla CloudFormation Resources, to which this pull event expands
:rtype: list
"""
function = kwargs.get("function")
if not function:
raise TypeError("Missing required keyword argument: function")
resources = []
lambda_eventsourcemapping = LambdaEventSourceMapping(
self.logical_id, attributes=function.get_passthrough_resource_attributes()
)
resources.append(lambda_eventsourcemapping)
try:
# Name will not be available for Alias resources
function_name_or_arn = function.get_runtime_attr("name")
except KeyError:
function_name_or_arn = function.get_runtime_attr("arn")
lambda_eventsourcemapping.FunctionName = function_name_or_arn
lambda_eventsourcemapping.EventSourceArn = self.get_event_source_arn()
lambda_eventsourcemapping.StartingPosition = self.StartingPosition
lambda_eventsourcemapping.StartingPositionTimestamp = self.StartingPositionTimestamp
lambda_eventsourcemapping.BatchSize = self.BatchSize
lambda_eventsourcemapping.Enabled = self.Enabled
lambda_eventsourcemapping.MaximumBatchingWindowInSeconds = self.MaximumBatchingWindowInSeconds
lambda_eventsourcemapping.MaximumRetryAttempts = self.MaximumRetryAttempts
lambda_eventsourcemapping.BisectBatchOnFunctionError = self.BisectBatchOnFunctionError
lambda_eventsourcemapping.MaximumRecordAgeInSeconds = self.MaximumRecordAgeInSeconds
lambda_eventsourcemapping.ParallelizationFactor = self.ParallelizationFactor
lambda_eventsourcemapping.Topics = self.Topics
lambda_eventsourcemapping.Queues = self.Queues
lambda_eventsourcemapping.SourceAccessConfigurations = self.SourceAccessConfigurations
lambda_eventsourcemapping.TumblingWindowInSeconds = self.TumblingWindowInSeconds
lambda_eventsourcemapping.FunctionResponseTypes = self.FunctionResponseTypes
lambda_eventsourcemapping.FilterCriteria = self.FilterCriteria
lambda_eventsourcemapping.KmsKeyArn = self.KmsKeyArn
lambda_eventsourcemapping.ScalingConfig = self.ScalingConfig
lambda_eventsourcemapping.ProvisionedPollerConfig = self.ProvisionedPollerConfig
lambda_eventsourcemapping.MetricsConfig = self.MetricsConfig
self._validate_filter_criteria()
if self.KafkaBootstrapServers:
lambda_eventsourcemapping.SelfManagedEventSource = {
"Endpoints": {"KafkaBootstrapServers": self.KafkaBootstrapServers}
}
if self.ConsumerGroupId:
consumer_group_id_structure = {"ConsumerGroupId": self.ConsumerGroupId}
if self.resource_type == "MSK":
lambda_eventsourcemapping.AmazonManagedKafkaEventSourceConfig = consumer_group_id_structure
elif self.resource_type == "SelfManagedKafka":
lambda_eventsourcemapping.SelfManagedKafkaEventSourceConfig = consumer_group_id_structure
else:
raise InvalidEventException(
self.logical_id,
f"Property ConsumerGroupId not defined for resource of type {self.resource_type}.",
)
destination_config_policy: Optional[Dict[str, Any]] = None
if self.DestinationConfig:
on_failure: Dict[str, Any] = sam_expect(
self.DestinationConfig.get("OnFailure"),
self.logical_id,
"DestinationConfig.OnFailure",
is_sam_event=True,
).to_be_a_map()
# `Type` property is for sam to attach the right policies
destination_type = on_failure.get("Type")
# SAM attaches the policies for SQS, SNS or S3 only if 'Type' is given
if destination_type:
# delete this field as its used internally for SAM to determine the policy
del on_failure["Type"]
# the values 'SQS', 'SNS', and 'S3' are allowed. No intrinsics are allowed
if destination_type not in ["SQS", "SNS", "S3"]:
raise InvalidEventException(
self.logical_id, "The only valid values for 'Type' are 'SQS', 'SNS', and 'S3'"
)
if destination_type == "SQS":
queue_arn = on_failure.get("Destination")
destination_config_policy = IAMRolePolicies().sqs_send_message_role_policy(
queue_arn, self.logical_id
)
elif destination_type == "SNS":
sns_topic_arn = on_failure.get("Destination")
destination_config_policy = IAMRolePolicies().sns_publish_role_policy(
sns_topic_arn, self.logical_id
)
elif destination_type == "S3":
s3_arn = on_failure.get("Destination")
destination_config_policy = IAMRolePolicies().s3_send_event_payload_role_policy(
s3_arn, self.logical_id
)
lambda_eventsourcemapping.DestinationConfig = self.DestinationConfig
self.add_extra_eventsourcemapping_fields(lambda_eventsourcemapping)
if "role" in kwargs:
self._link_policy(kwargs["role"], destination_config_policy) # type: ignore[no-untyped-call]
return resources
def _link_policy(self, role, destination_config_policy=None): # type: ignore[no-untyped-def]
"""If this source triggers a Lambda function whose execution role is auto-generated by SAM, add the
appropriate managed policy to this Role.
:param model.iam.IAMRole role: the execution role generated for the function
"""
policy_arn = self.get_policy_arn()
policy_statements = self.get_policy_statements()
if role is not None:
if policy_arn is not None and policy_arn not in role.ManagedPolicyArns:
role.ManagedPolicyArns.append(policy_arn)
if policy_statements is not None:
if role.Policies is None:
role.Policies = []
for policy in policy_statements:
if policy not in role.Policies and policy.get("PolicyDocument") not in [
d["PolicyDocument"] for d in role.Policies
]:
role.Policies.append(policy)
# add SQS or SNS policy only if role is present in kwargs
if role is not None and destination_config_policy is not None and destination_config_policy:
if role.Policies is None:
role.Policies = []
role.Policies.append(destination_config_policy)
if role.Policies and destination_config_policy not in role.Policies:
policy_document = destination_config_policy.get("PolicyDocument")
# do not add the policy if the same policy document is already present
if policy_document not in [d.get("PolicyDocument", {}) for d in role.Policies]:
role.Policies.append(destination_config_policy)
def _validate_filter_criteria(self) -> None:
if not self.FilterCriteria or is_intrinsic(self.FilterCriteria):
return
if self.resource_type not in self.RESOURCE_TYPES_WITH_EVENT_FILTERING:
raise InvalidEventException(
self.relative_id,
"FilterCriteria is only available for {} events.".format(
", ".join(self.RESOURCE_TYPES_WITH_EVENT_FILTERING)
),
)
# FilterCriteria is either empty or only has "Filters"
if list(self.FilterCriteria.keys()) not in [[], ["Filters"]]:
raise InvalidEventException(self.relative_id, "FilterCriteria field has a wrong format")
def validate_secrets_manager_kms_key_id(self) -> None:
if self.SecretsManagerKmsKeyId:
sam_expect(
self.SecretsManagerKmsKeyId, self.relative_id, "SecretsManagerKmsKeyId", is_sam_event=True
).to_be_a_string()
def _validate_source_access_configurations(self, supported_types: List[str], required_type: str) -> str:
"""
Validate the SourceAccessConfigurations parameter and return the URI to
be used for policy statement creation.
"""
if not self.SourceAccessConfigurations:
raise InvalidEventException(
self.relative_id,
f"No SourceAccessConfigurations for Amazon {self.resource_type} event provided.",
)
if not isinstance(self.SourceAccessConfigurations, list):
raise InvalidEventException(
self.relative_id,
"Provided SourceAccessConfigurations cannot be parsed into a list.",
)
required_type_uri: Optional[str] = None
for index, conf in enumerate(self.SourceAccessConfigurations):
sam_expect(conf, self.relative_id, f"SourceAccessConfigurations[{index}]", is_sam_event=True).to_be_a_map()
event_type: str = sam_expect(
conf.get("Type"), self.relative_id, f"SourceAccessConfigurations[{index}].Type", is_sam_event=True
).to_be_a_string()
if event_type not in supported_types:
raise InvalidEventException(
self.relative_id,
f"Invalid property Type specified in SourceAccessConfigurations. The supported values are: {supported_types}.",
)
if event_type == required_type:
if required_type_uri:
raise InvalidEventException(
self.relative_id,
f"Multiple {required_type} properties specified in SourceAccessConfigurations.",
)
required_type_uri = conf.get("URI")
if not required_type_uri:
raise InvalidEventException(
self.relative_id,
f"No {required_type} URI property specified in SourceAccessConfigurations.",
)
if not required_type_uri:
raise InvalidEventException(
self.relative_id,
f"No {required_type} property specified in SourceAccessConfigurations.",
)
return required_type_uri
@staticmethod
def _get_kms_decrypt_policy(secrets_manager_kms_key_id: str) -> Dict[str, Any]:
return {
"Action": ["kms:Decrypt"],
"Effect": "Allow",
"Resource": {
"Fn::Sub": "arn:${AWS::Partition}:kms:${AWS::Region}:${AWS::AccountId}:key/"
+ secrets_manager_kms_key_id
},
}
class Kinesis(PullEventSource):
"""Kinesis event source."""
resource_type = "Kinesis"
property_types: Dict[str, PropertyType] = {
**PullEventSource.property_types,
"Stream": PassThroughProperty(True),
"StartingPosition": PassThroughProperty(True),
}
Stream: PassThrough
def get_event_source_arn(self) -> Optional[PassThrough]:
return self.Stream
def get_policy_arn(self) -> Optional[str]:
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaKinesisExecutionRole")
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
return None
class DynamoDB(PullEventSource):
"""DynamoDB Streams event source."""
resource_type = "DynamoDB"
property_types: Dict[str, PropertyType] = {
**PullEventSource.property_types,
"Stream": PassThroughProperty(True),
"StartingPosition": PassThroughProperty(True),
}
Stream: PassThrough
def get_event_source_arn(self) -> Optional[PassThrough]:
return self.Stream
def get_policy_arn(self) -> Optional[str]:
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaDynamoDBExecutionRole")
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
return None
class SQS(PullEventSource):
"""SQS Queue event source."""
resource_type = "SQS"
property_types: Dict[str, PropertyType] = {
**PullEventSource.property_types,
"Queue": PassThroughProperty(True),
}
Queue: PassThrough
def get_event_source_arn(self) -> Optional[PassThrough]:
return self.Queue
def get_policy_arn(self) -> Optional[str]:
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaSQSQueueExecutionRole")
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
return None
class MSK(PullEventSource):
"""MSK event source."""
resource_type = "MSK"
property_types: Dict[str, PropertyType] = {
**PullEventSource.property_types,
"Stream": PassThroughProperty(True),
"StartingPosition": PassThroughProperty(True),
}
Stream: PassThrough
def get_event_source_arn(self) -> Optional[PassThrough]:
return self.Stream
def get_policy_arn(self) -> Optional[str]:
return ArnGenerator.generate_aws_managed_policy_arn("service-role/AWSLambdaMSKExecutionRole")
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
if self.SourceAccessConfigurations:
for conf in self.SourceAccessConfigurations:
# Lambda does not support multiple CLIENT_CERTIFICATE_TLS_AUTH configurations
if isinstance(conf, dict) and conf.get("Type") == "CLIENT_CERTIFICATE_TLS_AUTH" and conf.get("URI"):
return [
{
"PolicyName": "MSKExecutionRolePolicy",
"PolicyDocument": {
"Statement": [
{
"Action": [
"secretsmanager:GetSecretValue",
],
"Effect": "Allow",
"Resource": conf.get("URI"),
}
]
},
}
]
return None
class MQ(PullEventSource):
"""MQ event source."""
resource_type = "MQ"
property_types: Dict[str, PropertyType] = {
**PullEventSource.property_types,
"Broker": PassThroughProperty(True),
"DynamicPolicyName": Property(False, IS_BOOL),
}
Broker: PassThrough
DynamicPolicyName: Optional[bool]
@property
def _policy_name(self) -> str:
"""Generate policy name based on DynamicPolicyName flag and MQ logical ID.
Policy name is required though its update is "No interuption".
https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-iam-policy.html#cfn-iam-policy-policyname #noqa
Historically, policy name was hardcoded as `SamAutoGeneratedAMQPolicy` but it led to a policy name clash
and failure to deploy, if a Function had at least 2 MQ event sources.
Since policy is attached to the Lambda execution role,
policy name should be based on MQ logical ID not to clash with policy names of other MQ event sources.
However, to support backwards compatibility, we need to keep policy `SamAutoGeneratedAMQPolicy` by default,
because customers might have code which relys on that policy name consistancy.
To support both old policy name and ability to have more than one MQ event source, we introduce new field
`DynamicPolicyName` which when set to true will use MQ logical ID to generate policy name.
Q: Why to introduce a new field and not to make policy name dynamic by default if there are multiple
MQ event sources?
A: Since a customer could have a single MQ source and rely on it's policy name in their code. If that customer
decides to add a new MQ source, they don't want to change the policy name for the first MQ all over their
code base. But they can opt in using a dynamic policy name for all other MQ sources they add.
Q: Why not use dynamic policy names automatically for all MQ event sources but first?
A: SAM-T doesn't have state and doesn't know what was the CFN resource attribute in a previous transformation.
Hence, trying to "use dynamic policy names automatically for all MQ event sources but first" can rely only
on event source order. If a customer added a new MQ source __before__ an old one, an old one would receive
a dynamic name and would break (potentially) customer's code.
Returns
-------
Name of the policy which will be attached to the Lambda Execution role.
"""
return f"{self.logical_id}AMQPolicy" if self.DynamicPolicyName else "SamAutoGeneratedAMQPolicy"
def get_event_source_arn(self) -> Optional[PassThrough]:
return self.Broker
def get_policy_arn(self) -> Optional[str]:
return None
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
basic_auth_uri = self._validate_source_access_configurations(["BASIC_AUTH", "VIRTUAL_HOST"], "BASIC_AUTH")
document = {
"PolicyName": self._policy_name,
"PolicyDocument": {
"Statement": [
{
"Action": [
"secretsmanager:GetSecretValue",
],
"Effect": "Allow",
"Resource": basic_auth_uri,
},
{
"Action": [
"mq:DescribeBroker",
],
"Effect": "Allow",
"Resource": self.Broker,
},
]
},
}
if self.SecretsManagerKmsKeyId:
self.validate_secrets_manager_kms_key_id()
kms_policy = {
"Action": "kms:Decrypt",
"Effect": "Allow",
"Resource": {
"Fn::Sub": "arn:${AWS::Partition}:kms:${AWS::Region}:${AWS::AccountId}:key/"
+ self.SecretsManagerKmsKeyId
},
}
document["PolicyDocument"]["Statement"].append(kms_policy) # type: ignore[index]
return [document]
class SelfManagedKafka(PullEventSource):
"""
SelfManagedKafka event source
"""
resource_type = "SelfManagedKafka"
AUTH_MECHANISM = [
"SASL_SCRAM_256_AUTH",
"SASL_SCRAM_512_AUTH",
"BASIC_AUTH",
"CLIENT_CERTIFICATE_TLS_AUTH",
]
def get_event_source_arn(self) -> Optional[PassThrough]:
return None
def get_policy_arn(self) -> Optional[str]:
return None
def get_policy_statements(self) -> Optional[List[Dict[str, Any]]]:
if not self.KafkaBootstrapServers:
raise InvalidEventException(
self.relative_id,
"No KafkaBootstrapServers provided for self managed kafka as an event source",
)
if not self.Topics:
raise InvalidEventException(
self.relative_id,
"No Topics provided for self managed kafka as an event source",
)
if len(self.Topics) != 1:
raise InvalidEventException(
self.relative_id,
"Topics for self managed kafka only supports single configuration entry.",
)
if not self.SourceAccessConfigurations:
raise InvalidEventException(
self.relative_id,
"No SourceAccessConfigurations for self managed kafka event provided.",
)
document = self.generate_policy_document(self.SourceAccessConfigurations)
return [document]
def generate_policy_document(self, source_access_configurations: List[Any]): # type: ignore[no-untyped-def]
statements = []
authentication_uri, authentication_uri_2, has_vpc_config = self.get_secret_key(source_access_configurations)
if authentication_uri:
secret_manager = self.get_secret_manager_secret(authentication_uri) # type: ignore[no-untyped-call]
statements.append(secret_manager)
if authentication_uri_2:
secret_manager = self.get_secret_manager_secret(authentication_uri) # type: ignore[no-untyped-call]
statements.append(secret_manager)
if has_vpc_config:
vpc_permissions = self.get_vpc_permission()
statements.append(vpc_permissions)
if self.SecretsManagerKmsKeyId:
self.validate_secrets_manager_kms_key_id()
kms_policy = self._get_kms_decrypt_policy(self.SecretsManagerKmsKeyId)
statements.append(kms_policy)
return {
"PolicyDocument": {
"Statement": statements,
"Version": "2012-10-17",
},
"PolicyName": "SelfManagedKafkaExecutionRolePolicy",
}
def get_secret_key(self, source_access_configurations: List[Any]) -> Tuple[Optional[str], Optional[str], bool]:
authentication_uri = None
has_vpc_subnet = False
has_vpc_security_group = False
authentication_uri_2 = None
if not isinstance(source_access_configurations, list):
raise InvalidEventException(
self.relative_id,
"SourceAccessConfigurations for self managed kafka event should be a list.",
)
for config in source_access_configurations:
sam_expect(config, self.relative_id, "SourceAccessConfigurations").to_be_a_map()
if config.get("Type") == "VPC_SUBNET":
self.validate_uri(config.get("URI"), "VPC_SUBNET")
has_vpc_subnet = True
elif config.get("Type") == "VPC_SECURITY_GROUP":
self.validate_uri(config.get("URI"), "VPC_SECURITY_GROUP")
has_vpc_security_group = True
elif config.get("Type") in self.AUTH_MECHANISM:
if authentication_uri:
raise InvalidEventException(
self.relative_id,
"Multiple auth mechanism properties specified in SourceAccessConfigurations for self managed kafka event.",
)
self.validate_uri(config.get("URI"), "auth mechanism")
authentication_uri = config.get("URI")
elif config.get("Type") == "SERVER_ROOT_CA_CERTIFICATE":
self.validate_uri(config.get("URI"), "SERVER_ROOT_CA_CERTIFICATE")
authentication_uri_2 = config.get("URI")
else:
raise InvalidEventException(
self.relative_id,
"Invalid SourceAccessConfigurations Type provided for self managed kafka event.",
)
if (not has_vpc_subnet and has_vpc_security_group) or (has_vpc_subnet and not has_vpc_security_group):
raise InvalidEventException(
self.relative_id,
"VPC_SUBNET and VPC_SECURITY_GROUP in SourceAccessConfigurations for SelfManagedKafka must be both provided.",
)
return authentication_uri, authentication_uri_2, (has_vpc_subnet and has_vpc_security_group)
def validate_uri(self, uri: Optional[Any], msg: str) -> None:
if not uri:
raise InvalidEventException(
self.relative_id,
f"No {msg} URI property specified in SourceAccessConfigurations for self managed kafka event.",
)
if not isinstance(uri, str) and not is_intrinsic(uri):
raise InvalidEventException(
self.relative_id,
f"Wrong Type for {msg} URI property specified in SourceAccessConfigurations for self managed kafka event.",
)
def get_secret_manager_secret(self, authentication_uri): # type: ignore[no-untyped-def]
return {
"Action": ["secretsmanager:GetSecretValue"],
"Effect": "Allow",
"Resource": authentication_uri,
}
def get_vpc_permission(self) -> Dict[str, Any]:
return {
"Action": [
"ec2:CreateNetworkInterface",
"ec2:DescribeNetworkInterfaces",
"ec2:DeleteNetworkInterface",
"ec2:DescribeVpcs",
"ec2:DescribeSubnets",
"ec2:DescribeSecurityGroups",
],
"Effect": "Allow",
"Resource": "*",
}
@staticmethod
@deprecated()
def get_kms_policy(secrets_manager_kms_key_id: str) -> Dict[str, Any]:
return {
"Action": ["kms:Decrypt"],
"Effect": "Allow",
"Resource": {
"Fn::Sub": "arn:${AWS::Partition}:kms:${AWS::Region}:${AWS::AccountId}:key/"
+ secrets_manager_kms_key_id
},
}
class DocumentDB(PullEventSource):
"""DocumentDB event source."""
resource_type = "DocumentDB"
property_types: Dict[str, PropertyType] = {
**PullEventSource.property_types,
"Cluster": PassThroughProperty(True),
"DatabaseName": PassThroughProperty(True),
"CollectionName": PassThroughProperty(False),
"FullDocument": PassThroughProperty(False),
}
Cluster: PassThrough
DatabaseName: PassThrough
CollectionName: Optional[PassThrough]
FullDocument: Optional[PassThrough]
def add_extra_eventsourcemapping_fields(self, lambda_eventsourcemapping: LambdaEventSourceMapping) -> None:
lambda_eventsourcemapping.DocumentDBEventSourceConfig = {
"DatabaseName": self.DatabaseName,
}
if self.CollectionName:
lambda_eventsourcemapping.DocumentDBEventSourceConfig["CollectionName"] = self.CollectionName # type: ignore[attr-defined]
if self.FullDocument:
lambda_eventsourcemapping.DocumentDBEventSourceConfig["FullDocument"] = self.FullDocument # type: ignore[attr-defined]
def get_event_source_arn(self) -> Optional[PassThrough]:
return self.Cluster
def get_policy_arn(self) -> Optional[str]:
return None
def get_policy_statements(self) -> List[Dict[str, Any]]:
basic_auth_uri = self._validate_source_access_configurations(["BASIC_AUTH"], "BASIC_AUTH")
statements = [
{
"Action": [
"secretsmanager:GetSecretValue",
],
"Effect": "Allow",
"Resource": basic_auth_uri,
},
{
"Action": [
"rds:DescribeDBClusterParameters",
],
"Effect": "Allow",
"Resource": {"Fn::Sub": "arn:${AWS::Partition}:rds:${AWS::Region}:${AWS::AccountId}:cluster-pg:*"},
},
{
"Action": [
"rds:DescribeDBSubnetGroups",
],
"Effect": "Allow",
"Resource": {"Fn::Sub": "arn:${AWS::Partition}:rds:${AWS::Region}:${AWS::AccountId}:subgrp:*"},
},
{
"Action": [
"rds:DescribeDBClusters",
],
"Effect": "Allow",
"Resource": self.Cluster,
},
{
"Action": [
"ec2:CreateNetworkInterface",
"ec2:DescribeNetworkInterfaces",
"ec2:DeleteNetworkInterface",
"ec2:DescribeVpcs",
"ec2:DescribeSubnets",
"ec2:DescribeSecurityGroups",
],
"Effect": "Allow",
"Resource": "*",
},
]
if self.SecretsManagerKmsKeyId:
self.validate_secrets_manager_kms_key_id()
kms_policy = self._get_kms_decrypt_policy(self.SecretsManagerKmsKeyId)
statements.append(kms_policy)
document = {
"PolicyName": "SamAutoGeneratedDocumentDBPolicy",
"PolicyDocument": {"Statement": statements},
}
return [document]