samcli/lib/sync/sync_flow_factory.py (301 lines of code) (raw):
"""SyncFlow Factory for creating SyncFlows based on resource types"""
import logging
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, cast
from botocore.exceptions import ClientError
from samcli.commands.build.build_context import BuildContext
from samcli.commands.exceptions import InvalidStackNameException
from samcli.lib.bootstrap.nested_stack.nested_stack_manager import NestedStackManager
from samcli.lib.build.app_builder import ApplicationBuildResult
from samcli.lib.package.utils import is_local_folder, is_zip_file
from samcli.lib.providers.provider import Function, FunctionBuildInfo, ResourceIdentifier, Stack
from samcli.lib.sync.flows.auto_dependency_layer_sync_flow import AutoDependencyLayerParentSyncFlow
from samcli.lib.sync.flows.function_sync_flow import FunctionSyncFlow
from samcli.lib.sync.flows.http_api_sync_flow import HttpApiSyncFlow
from samcli.lib.sync.flows.image_function_sync_flow import ImageFunctionSyncFlow
from samcli.lib.sync.flows.layer_sync_flow import (
LayerSyncFlow,
LayerSyncFlowSkipBuildDirectory,
LayerSyncFlowSkipBuildZipFile,
)
from samcli.lib.sync.flows.rest_api_sync_flow import RestApiSyncFlow
from samcli.lib.sync.flows.stepfunctions_sync_flow import StepFunctionsSyncFlow
from samcli.lib.sync.flows.zip_function_sync_flow import (
ZipFunctionSyncFlow,
ZipFunctionSyncFlowSkipBuildDirectory,
ZipFunctionSyncFlowSkipBuildZipFile,
)
from samcli.lib.sync.sync_flow import SyncFlow
from samcli.lib.utils.boto_utils import (
get_boto_client_provider_with_config,
get_boto_resource_provider_with_config,
get_client_error_code,
)
from samcli.lib.utils.cloudformation import get_resource_summaries
from samcli.lib.utils.packagetype import IMAGE, ZIP
from samcli.lib.utils.resource_type_based_factory import ResourceTypeBasedFactory
from samcli.lib.utils.resources import (
AWS_APIGATEWAY_RESTAPI,
AWS_APIGATEWAY_V2_API,
AWS_LAMBDA_FUNCTION,
AWS_LAMBDA_LAYERVERSION,
AWS_SERVERLESS_API,
AWS_SERVERLESS_FUNCTION,
AWS_SERVERLESS_HTTPAPI,
AWS_SERVERLESS_LAYERVERSION,
AWS_SERVERLESS_STATEMACHINE,
AWS_STEPFUNCTIONS_STATEMACHINE,
)
if TYPE_CHECKING: # pragma: no cover
from samcli.commands.deploy.deploy_context import DeployContext
from samcli.commands.sync.sync_context import SyncContext
LOG = logging.getLogger(__name__)
class SyncCodeResources:
"""
A class that records the supported resource types that can perform sync --code
"""
_accepted_resources = [
AWS_SERVERLESS_FUNCTION,
AWS_LAMBDA_FUNCTION,
AWS_SERVERLESS_LAYERVERSION,
AWS_LAMBDA_LAYERVERSION,
AWS_SERVERLESS_API,
AWS_APIGATEWAY_RESTAPI,
AWS_SERVERLESS_HTTPAPI,
AWS_APIGATEWAY_V2_API,
AWS_SERVERLESS_STATEMACHINE,
AWS_STEPFUNCTIONS_STATEMACHINE,
]
@classmethod
def values(cls) -> List[str]:
"""
A class getter to retrieve the accepted resource list
Returns: List[str]
The accepted resources list
"""
return cls._accepted_resources
class SyncFlowFactory(ResourceTypeBasedFactory[SyncFlow]): # pylint: disable=E1136
"""Factory class for SyncFlow
Creates appropriate SyncFlow types based on stack resource types
"""
_deploy_context: "DeployContext"
_build_context: "BuildContext"
_sync_context: "SyncContext"
_physical_id_mapping: Dict[str, str]
_auto_dependency_layer: bool
def __init__(
self,
build_context: "BuildContext",
deploy_context: "DeployContext",
sync_context: "SyncContext",
stacks: List[Stack],
auto_dependency_layer: bool,
) -> None:
"""
Parameters
----------
build_context : BuildContext
BuildContext to be passed into each individual SyncFlow
deploy_context : DeployContext
DeployContext to be passed into each individual SyncFlow
sync_context: SyncContext
SyncContext object that obtains sync information.
stacks : List[Stack]
List of stacks containing a root stack and optional nested ones
"""
super().__init__(stacks)
self._deploy_context = deploy_context
self._build_context = build_context
self._sync_context = sync_context
self._auto_dependency_layer = auto_dependency_layer
self._physical_id_mapping = dict()
def load_physical_id_mapping(self) -> None:
"""Load physical IDs of the stack resources from remote"""
LOG.debug("Loading physical ID mapping")
resource_provider = get_boto_resource_provider_with_config(
region=self._deploy_context.region, profile=self._deploy_context.profile
)
client_provider = get_boto_client_provider_with_config(
region=self._deploy_context.region, profile=self._deploy_context.profile
)
try:
resource_mapping = get_resource_summaries(
boto_resource_provider=resource_provider,
boto_client_provider=client_provider,
stack_name=self._deploy_context.stack_name,
)
except ClientError as ex:
error_code = get_client_error_code(ex)
if error_code == "ValidationError":
raise InvalidStackNameException(
f"Invalid --stack-name parameter. Stack with id '{self._deploy_context.stack_name}' does not exist"
) from ex
raise ex
# get the resource_id -> physical_id mapping
self._physical_id_mapping = {
resource_id: summary.physical_resource_id for resource_id, summary in resource_mapping.items()
}
def _create_lambda_flow(
self,
resource_identifier: ResourceIdentifier,
application_build_result: Optional[ApplicationBuildResult],
) -> Optional[FunctionSyncFlow]:
function = self._build_context.function_provider.get(str(resource_identifier))
if not function:
LOG.warning("Can't find function resource with '%s' logical id", str(resource_identifier))
return None
if function.packagetype == ZIP:
return self._create_zip_type_lambda_flow(resource_identifier, application_build_result, function)
if function.packagetype == IMAGE:
return self._create_image_type_lambda_flow(resource_identifier, application_build_result, function)
return None
def _create_zip_type_lambda_flow(
self,
resource_identifier: ResourceIdentifier,
application_build_result: Optional[ApplicationBuildResult],
function: Function,
) -> Optional[FunctionSyncFlow]:
if not function.function_build_info.is_buildable():
if function.function_build_info == FunctionBuildInfo.InlineCode:
LOG.debug(
"No need to create sync flow for a function with InlineCode '%s' resource", str(resource_identifier)
)
return None
if function.function_build_info == FunctionBuildInfo.PreZipped:
# if codeuri points to zip file, use ZipFunctionSyncFlowSkipBuildZipFile sync flow
LOG.debug("Creating ZipFunctionSyncFlowSkipBuildZipFile for '%s' resource", resource_identifier)
return ZipFunctionSyncFlowSkipBuildZipFile(
str(resource_identifier),
self._build_context,
self._deploy_context,
self._sync_context,
self._physical_id_mapping,
self._stacks,
application_build_result,
)
if function.function_build_info == FunctionBuildInfo.SkipBuild:
# if function is marked with SkipBuild, use ZipFunctionSyncFlowSkipBuildDirectory sync flow
LOG.debug("Creating ZipFunctionSyncFlowSkipBuildDirectory for '%s' resource", resource_identifier)
return ZipFunctionSyncFlowSkipBuildDirectory(
str(resource_identifier),
self._build_context,
self._deploy_context,
self._sync_context,
self._physical_id_mapping,
self._stacks,
application_build_result,
)
# only return auto dependency layer sync if runtime is supported
if self._auto_dependency_layer and NestedStackManager.is_runtime_supported(function.runtime):
return AutoDependencyLayerParentSyncFlow(
str(resource_identifier),
self._build_context,
self._deploy_context,
self._sync_context,
self._physical_id_mapping,
self._stacks,
application_build_result,
)
return ZipFunctionSyncFlow(
str(resource_identifier),
self._build_context,
self._deploy_context,
self._sync_context,
self._physical_id_mapping,
self._stacks,
application_build_result,
)
def _create_image_type_lambda_flow(
self,
resource_identifier: ResourceIdentifier,
application_build_result: Optional[ApplicationBuildResult],
function: Function,
) -> Optional[FunctionSyncFlow]:
if not function.function_build_info.is_buildable():
LOG.warning("Can't build image type function with '%s' logical id", str(resource_identifier))
return None
return ImageFunctionSyncFlow(
str(resource_identifier),
self._build_context,
self._deploy_context,
self._sync_context,
self._physical_id_mapping,
self._stacks,
application_build_result,
)
def _create_layer_flow(
self,
resource_identifier: ResourceIdentifier,
application_build_result: Optional[ApplicationBuildResult],
) -> Optional[SyncFlow]:
layer = self._build_context.layer_provider.get(str(resource_identifier))
if not layer:
LOG.warning("Can't find layer resource with '%s' logical id", str(resource_identifier))
return None
if BuildContext.is_layer_buildable(layer):
return LayerSyncFlow(
str(resource_identifier),
self._build_context,
self._deploy_context,
self._sync_context,
self._physical_id_mapping,
self._stacks,
application_build_result,
)
if is_local_folder(layer.codeuri):
LOG.debug("Creating LayerSyncFlowSkipBuildDirectory for '%s' resource", resource_identifier)
return LayerSyncFlowSkipBuildDirectory(
str(resource_identifier),
self._build_context,
self._deploy_context,
self._sync_context,
self._physical_id_mapping,
self._stacks,
application_build_result,
)
if is_zip_file(layer.codeuri):
LOG.debug("Creating LayerSyncFlowSkipBuildZipFile for '%s' resource", resource_identifier)
return LayerSyncFlowSkipBuildZipFile(
str(resource_identifier),
self._build_context,
self._deploy_context,
self._sync_context,
self._physical_id_mapping,
self._stacks,
application_build_result,
)
LOG.warning("Can't create sync flow for '%s' layer resource", resource_identifier)
return None
def _create_rest_api_flow(
self,
resource_identifier: ResourceIdentifier,
application_build_result: Optional[ApplicationBuildResult],
) -> SyncFlow:
return RestApiSyncFlow(
str(resource_identifier),
self._build_context,
self._deploy_context,
self._sync_context,
self._physical_id_mapping,
self._stacks,
)
def _create_api_flow(
self,
resource_identifier: ResourceIdentifier,
application_build_result: Optional[ApplicationBuildResult],
) -> SyncFlow:
return HttpApiSyncFlow(
str(resource_identifier),
self._build_context,
self._deploy_context,
self._sync_context,
self._physical_id_mapping,
self._stacks,
)
def _create_stepfunctions_flow(
self,
resource_identifier: ResourceIdentifier,
application_build_result: Optional[ApplicationBuildResult],
) -> Optional[SyncFlow]:
return StepFunctionsSyncFlow(
str(resource_identifier),
self._build_context,
self._deploy_context,
self._sync_context,
self._physical_id_mapping,
self._stacks,
)
GeneratorFunction = Callable[
["SyncFlowFactory", ResourceIdentifier, Optional[ApplicationBuildResult]], Optional[SyncFlow]
]
GENERATOR_MAPPING: Dict[str, GeneratorFunction] = {
AWS_LAMBDA_FUNCTION: _create_lambda_flow,
AWS_SERVERLESS_FUNCTION: _create_lambda_flow,
AWS_SERVERLESS_LAYERVERSION: _create_layer_flow,
AWS_LAMBDA_LAYERVERSION: _create_layer_flow,
AWS_SERVERLESS_API: _create_rest_api_flow,
AWS_APIGATEWAY_RESTAPI: _create_rest_api_flow,
AWS_SERVERLESS_HTTPAPI: _create_api_flow,
AWS_APIGATEWAY_V2_API: _create_api_flow,
AWS_SERVERLESS_STATEMACHINE: _create_stepfunctions_flow,
AWS_STEPFUNCTIONS_STATEMACHINE: _create_stepfunctions_flow,
}
# SyncFlow mapping between resource type and creation function
# Ignoring no-self-use as PyLint has a bug with Generic Abstract Classes
def _get_generator_mapping(self) -> Dict[str, GeneratorFunction]: # pylint: disable=no-self-use
return SyncFlowFactory.GENERATOR_MAPPING
def create_sync_flow(
self, resource_identifier: ResourceIdentifier, application_build_result: Optional[ApplicationBuildResult] = None
) -> Optional[SyncFlow]:
generator = self._get_generator_function(resource_identifier)
if not generator:
return None
return cast(SyncFlowFactory.GeneratorFunction, generator)(self, resource_identifier, application_build_result)