samtranslator/model/stepfunctions/generators.py (267 lines of code) (raw):
import json
from copy import deepcopy
from typing import Any, Dict, List, Tuple
from samtranslator.metrics.method_decorator import cw_timer
from samtranslator.model.exceptions import InvalidEventException, InvalidResourceException
from samtranslator.model.iam import IAMRole, IAMRolePolicies
from samtranslator.model.intrinsics import fnJoin, is_intrinsic
from samtranslator.model.resource_policies import ResourcePolicies
from samtranslator.model.role_utils import construct_role_for_resource
from samtranslator.model.s3_utils.uri_parser import parse_s3_uri
from samtranslator.model.stepfunctions.resources import (
StepFunctionsStateMachine,
StepFunctionsStateMachineAlias,
StepFunctionsStateMachineVersion,
)
from samtranslator.model.tags.resource_tagging import get_tag_list
from samtranslator.model.xray_utils import get_xray_managed_policy_name
from samtranslator.utils.cfn_dynamic_references import is_dynamic_reference
class StateMachineGenerator:
_SAM_KEY = "stateMachine:createdBy"
_SAM_VALUE = "SAM"
_SUBSTITUTION_NAME_TEMPLATE = "definition_substitution_%s"
_SUBSTITUTION_KEY_TEMPLATE = "${definition_substitution_%s}"
SFN_INVALID_PROPERTY_BOTH_ROLE_POLICY = (
"Specify either 'Role' or 'Policies' (but not both at the same time) or neither of them"
)
def __init__( # type: ignore[no-untyped-def] # noqa: PLR0913
self,
logical_id,
depends_on,
managed_policy_map,
intrinsics_resolver,
definition,
definition_uri,
logging,
name,
policies,
permissions_boundary,
definition_substitutions,
role,
state_machine_type,
tracing,
events,
event_resources,
event_resolver,
role_path=None,
tags=None,
resource_attributes=None,
passthrough_resource_attributes=None,
get_managed_policy_map=None,
auto_publish_alias=None,
deployment_preference=None,
use_alias_as_event_target=None,
):
"""
Constructs an State Machine Generator class that generates a State Machine resource
:param logical_id: Logical id of the SAM State Machine Resource
:param depends_on: Any resources that need to be depended on
:param managed_policy_map: Map of managed policy names to the ARNs
:param intrinsics_resolver: Instance of the resolver that knows how to resolve parameter references
:param definition: State Machine definition
:param definition_uri: URI to State Machine definition
:param logging: Logging configuration for the State Machine
:param name: Name of the State Machine resource
:param policies: Policies attached to the execution role
:param permissions_boundary: The ARN of the policy used to set the permissions boundary for the role
:param definition_substitutions: Variable-to-value mappings to be replaced in the State Machine definition
:param role: Role ARN to use for the execution role
:param role_path: The file path of the execution role
:param state_machine_type: Type of the State Machine
:param tracing: Tracing configuration for the State Machine
:param events: List of event sources for the State Machine
:param event_resources: Event resources to link
:param event_resolver: Resolver that maps Event types to Event classes
:param tags: Tags to be associated with the State Machine resource
:param resource_attributes: Resource attributes to add to the State Machine resource
:param passthrough_resource_attributes: Attributes such as `Condition` that are added to derived resources
:param auto_publish_alias: Name of the state machine alias to automatically create and update
:deployment_preference: Settings to enable gradual state machine deployments
:param use_alias_as_event_target: Whether to use the state machine alias as the event target
"""
self.logical_id = logical_id
self.depends_on = depends_on
self.managed_policy_map = managed_policy_map
self.intrinsics_resolver = intrinsics_resolver
self.passthrough_resource_attributes = passthrough_resource_attributes
self.resource_attributes = resource_attributes
self.definition = definition
self.definition_uri = definition_uri
self.name = name
self.logging = logging
self.policies = policies
self.permissions_boundary = permissions_boundary
self.definition_substitutions = definition_substitutions
self.role = role
self.role_path = role_path
self.type = state_machine_type
self.tracing = tracing
self.events = events
self.event_resources = event_resources
self.event_resolver = event_resolver
self.tags = tags
self.state_machine = StepFunctionsStateMachine(
logical_id, depends_on=depends_on, attributes=resource_attributes
)
self.substitution_counter = 1
self.get_managed_policy_map = get_managed_policy_map
self.auto_publish_alias = auto_publish_alias
self.deployment_preference = deployment_preference
self.use_alias_as_event_target = use_alias_as_event_target
@cw_timer(prefix="Generator", name="StateMachine")
def to_cloudformation(self): # type: ignore[no-untyped-def]
"""
Constructs and returns the State Machine resource and any additional resources associated with it.
:returns: a list of resources including the State Machine resource.
:rtype: list
"""
resources: List[Any] = [self.state_machine]
# Defaulting to {} will add the DefinitionSubstitutions field on the transform output even when it is not relevant
if self.definition_substitutions:
self.state_machine.DefinitionSubstitutions = self.definition_substitutions
if self.definition and self.definition_uri:
raise InvalidResourceException(
self.logical_id, "Specify either 'Definition' or 'DefinitionUri' property and not both."
)
if self.definition:
processed_definition = deepcopy(self.definition)
substitutions = self._replace_dynamic_values_with_substitutions(processed_definition) # type: ignore[no-untyped-call]
if len(substitutions) > 0:
if self.state_machine.DefinitionSubstitutions:
self.state_machine.DefinitionSubstitutions.update(substitutions)
else:
self.state_machine.DefinitionSubstitutions = substitutions
self.state_machine.DefinitionString = self._build_definition_string(processed_definition) # type: ignore[no-untyped-call]
elif self.definition_uri:
self.state_machine.DefinitionS3Location = self._construct_definition_uri()
else:
raise InvalidResourceException(
self.logical_id, "Either 'Definition' or 'DefinitionUri' property must be specified."
)
if self.role and self.policies:
raise InvalidResourceException(self.logical_id, self.SFN_INVALID_PROPERTY_BOTH_ROLE_POLICY)
if self.role:
self.state_machine.RoleArn = self.role
else:
if not self.policies:
self.policies = []
execution_role = self._construct_role()
self.state_machine.RoleArn = execution_role.get_runtime_attr("arn")
resources.append(execution_role)
self.state_machine.StateMachineName = self.name
self.state_machine.StateMachineType = self.type
self.state_machine.LoggingConfiguration = self.logging
self.state_machine.TracingConfiguration = self.tracing
self.state_machine.Tags = self._construct_tag_list()
managed_traffic_shifting_resources = self._generate_managed_traffic_shifting_resources()
resources.extend(managed_traffic_shifting_resources)
event_resources = self._generate_event_resources()
resources.extend(event_resources)
return resources
def _construct_definition_uri(self) -> Dict[str, Any]:
"""
Constructs the State Machine's `DefinitionS3 property`_, from the SAM State Machines's DefinitionUri property.
:returns: a DefinitionUri dict, containing the S3 Bucket, Key, and Version of the State Machine definition.
:rtype: dict
"""
if isinstance(self.definition_uri, dict):
if not self.definition_uri.get("Bucket", None) or not self.definition_uri.get("Key", None):
# DefinitionUri is a dictionary but does not contain Bucket or Key property
raise InvalidResourceException(
self.logical_id, "'DefinitionUri' requires Bucket and Key properties to be specified."
)
s3_pointer = self.definition_uri
else:
# DefinitionUri is a string
parsed_s3_pointer = parse_s3_uri(self.definition_uri)
if parsed_s3_pointer is None:
raise InvalidResourceException(
self.logical_id,
"'DefinitionUri' is not a valid S3 Uri of the form "
"'s3://bucket/key' with optional versionId query parameter.",
)
s3_pointer = parsed_s3_pointer
definition_s3 = {"Bucket": s3_pointer["Bucket"], "Key": s3_pointer["Key"]}
if "Version" in s3_pointer:
definition_s3["Version"] = s3_pointer["Version"]
return definition_s3
def _build_definition_string(self, definition_dict): # type: ignore[no-untyped-def]
"""
Builds a CloudFormation definition string from a definition dictionary. The definition string constructed is
a Fn::Join intrinsic function to make it readable.
:param definition_dict: State machine definition as a dictionary
:returns: the state machine definition.
:rtype: dict
"""
# Indenting and then splitting the JSON-encoded string for readability of the state machine definition in the CloudFormation translated resource.
# Separators are passed explicitly to maintain trailing whitespace consistency across Py2 and Py3
definition_lines = json.dumps(definition_dict, sort_keys=True, indent=4, separators=(",", ": ")).split("\n")
return fnJoin("\n", definition_lines)
def _construct_role(self) -> IAMRole:
"""
Constructs a State Machine execution role based on this SAM State Machine's Policies property.
:returns: the generated IAM Role
:rtype: model.iam.IAMRole
"""
policies = self.policies[:]
if self.tracing and self.tracing.get("Enabled") is True:
policies.append(get_xray_managed_policy_name())
state_machine_policies = ResourcePolicies(
{"Policies": policies},
# No support for policy templates in the "core"
policy_template_processor=None,
)
return construct_role_for_resource(
resource_logical_id=self.logical_id,
role_path=self.role_path,
attributes=self.passthrough_resource_attributes,
managed_policy_map=self.managed_policy_map,
assume_role_policy_document=IAMRolePolicies.stepfunctions_assume_role_policy(),
resource_policies=state_machine_policies,
tags=self._construct_tag_list(),
permissions_boundary=self.permissions_boundary,
get_managed_policy_map=self.get_managed_policy_map,
)
def _construct_tag_list(self) -> List[Dict[str, Any]]:
"""
Transforms the SAM defined Tags into the form CloudFormation is expecting.
:returns: List of Tag Dictionaries
:rtype: list
"""
sam_tag = {self._SAM_KEY: self._SAM_VALUE}
return get_tag_list(sam_tag) + get_tag_list(self.tags)
def _construct_version(self) -> StepFunctionsStateMachineVersion:
"""Constructs a state machine version resource that will be auto-published when the revision id of the state machine changes.
:return: Step Functions state machine version resource
"""
# Unlike Lambda function versions, state machine versions do not need a hash suffix because
# they are always replaced when their corresponding state machine is updated.
# I.e. A SAM StateMachine resource will never have multiple version resources at the same time.
logical_id = f"{self.logical_id}Version"
attributes = self.passthrough_resource_attributes.copy()
# Both UpdateReplacePolicy and DeletionPolicy are needed to protect previous version from deletion
# to ensure gradual deployment works.
if "DeletionPolicy" not in attributes:
attributes["DeletionPolicy"] = "Retain"
if "UpdateReplacePolicy" not in attributes:
attributes["UpdateReplacePolicy"] = "Retain"
state_machine_version = StepFunctionsStateMachineVersion(logical_id=logical_id, attributes=attributes)
state_machine_version.StateMachineArn = self.state_machine.get_runtime_attr("arn")
state_machine_version.StateMachineRevisionId = self.state_machine.get_runtime_attr("state_machine_revision_id")
return state_machine_version
def _construct_alias(self, version: StepFunctionsStateMachineVersion) -> StepFunctionsStateMachineAlias:
"""Constructs a state machine alias resource pointing to the given state machine version.
:return: Step Functions state machine alias resource
"""
logical_id = f"{self.logical_id}Alias{self.auto_publish_alias}"
attributes = self.passthrough_resource_attributes
state_machine_alias = StepFunctionsStateMachineAlias(logical_id=logical_id, attributes=attributes)
state_machine_alias.Name = self.auto_publish_alias
state_machine_version_arn = version.get_runtime_attr("arn")
deployment_preference = {}
if self.deployment_preference:
deployment_preference = self.deployment_preference
else:
deployment_preference["Type"] = "ALL_AT_ONCE"
deployment_preference["StateMachineVersionArn"] = state_machine_version_arn
state_machine_alias.DeploymentPreference = deployment_preference
self.state_machine_alias = state_machine_alias
return state_machine_alias
def _generate_managed_traffic_shifting_resources(
self,
) -> List[Any]:
"""Generates and returns the version and alias resources associated with this state machine's managed traffic shifting.
:returns: a list containing the state machine's version and alias resources
:rtype: list
"""
if not self.auto_publish_alias and self.use_alias_as_event_target:
raise InvalidResourceException(
self.logical_id, "'UseAliasAsEventTarget' requires 'AutoPublishAlias' property to be specified."
)
if not self.auto_publish_alias and not self.deployment_preference:
return []
if not self.auto_publish_alias and self.deployment_preference:
raise InvalidResourceException(
self.logical_id, "'DeploymentPreference' requires 'AutoPublishAlias' property to be specified."
)
state_machine_version = self._construct_version()
return [state_machine_version, self._construct_alias(state_machine_version)]
def _generate_event_resources(self) -> List[Dict[str, Any]]:
"""Generates and returns the resources associated with this state machine's event sources.
:returns: a list containing the state machine's event resources
:rtype: list
"""
resources = []
if self.events:
for logical_id, event_dict in self.events.items():
kwargs = {
"intrinsics_resolver": self.intrinsics_resolver,
"permissions_boundary": self.permissions_boundary,
}
try:
eventsource = self.event_resolver.resolve_resource_type(event_dict).from_dict(
self.state_machine.logical_id + logical_id, event_dict, logical_id
)
for name, resource in self.event_resources[logical_id].items():
kwargs[name] = resource
except (TypeError, AttributeError) as e:
raise InvalidEventException(logical_id, str(e)) from e
target_resource = (
(self.state_machine_alias or self.state_machine)
if self.use_alias_as_event_target
else self.state_machine
)
resources += eventsource.to_cloudformation(resource=target_resource, **kwargs)
return resources
def _replace_dynamic_values_with_substitutions(self, _input): # type: ignore[no-untyped-def]
"""
Replaces the CloudFormation instrinsic functions and dynamic references within the input with substitutions.
:param _input: Input dictionary in which the dynamic values need to be replaced with substitutions
:returns: List of substitution to dynamic value mappings
:rtype: dict
"""
substitution_map = {}
for path in self._get_paths_to_intrinsics(_input): # type: ignore[no-untyped-call]
location = _input
for step in path[:-1]:
location = location[step]
sub_name, sub_key = self._generate_substitution()
substitution_map[sub_name] = location[path[-1]]
location[path[-1]] = sub_key
return substitution_map
def _get_paths_to_intrinsics(self, _input, path=None): # type: ignore[no-untyped-def]
"""
Returns all paths to dynamic values within a dictionary
:param _input: Input dictionary to find paths to dynamic values in
:param path: Optional list to keep track of the path to the input dictionary
:returns list: List of keys that defines the path to a dynamic value within the input dictionary
"""
if path is None:
path = []
dynamic_value_paths = [] # type: ignore[var-annotated]
if isinstance(_input, dict):
iterator = _input.items()
elif isinstance(_input, list):
iterator = enumerate(_input) # type: ignore[assignment]
else:
return dynamic_value_paths
for key, value in sorted(iterator, key=lambda item: item[0]): # type: ignore[no-any-return]
if is_intrinsic(value) or is_dynamic_reference(value):
dynamic_value_paths.append([*path, key])
elif isinstance(value, (dict, list)):
dynamic_value_paths.extend(self._get_paths_to_intrinsics(value, [*path, key])) # type: ignore[no-untyped-call]
return dynamic_value_paths
def _generate_substitution(self) -> Tuple[str, str]:
"""
Generates a name and key for a new substitution.
:returns: Substitution name and key
:rtype: string, string
"""
substitution_name = self._SUBSTITUTION_NAME_TEMPLATE % self.substitution_counter
substitution_key = self._SUBSTITUTION_KEY_TEMPLATE % self.substitution_counter
self.substitution_counter += 1
return substitution_name, substitution_key