samcli/lib/sync/flows/zip_function_sync_flow.py (177 lines of code) (raw):
"""SyncFlow for ZIP based Lambda Functions"""
import base64
import hashlib
import logging
import os
import shutil
import tempfile
import uuid
from contextlib import ExitStack
from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast
from samcli.lib.build.app_builder import ApplicationBuilder, ApplicationBuildResult
from samcli.lib.build.build_graph import BuildGraph
from samcli.lib.package.s3_uploader import S3Uploader
from samcli.lib.package.utils import make_zip_with_lambda_permissions
from samcli.lib.providers.provider import Stack
from samcli.lib.sync.flows.function_sync_flow import FunctionSyncFlow, wait_for_function_update_complete
from samcli.lib.sync.sync_flow import ApiCallTypes, ResourceAPICall
from samcli.lib.utils.colors import Colored
from samcli.lib.utils.hash import file_checksum
from samcli.lib.utils.osutils import rmtree_if_exists
if TYPE_CHECKING: # pragma: no cover
from samcli.commands.build.build_context import BuildContext
from samcli.commands.deploy.deploy_context import DeployContext
from samcli.commands.sync.sync_context import SyncContext
LOG = logging.getLogger(__name__)
MAXIMUM_FUNCTION_ZIP_SIZE = 50 * 1024 * 1024 # 50MB limit for Lambda direct ZIP upload
class ZipFunctionSyncFlow(FunctionSyncFlow):
"""SyncFlow for ZIP based functions"""
_s3_client: Any
_artifact_folder: Optional[str]
_zip_file: Optional[str]
_build_graph: Optional[BuildGraph]
def __init__(
self,
function_identifier: str,
build_context: "BuildContext",
deploy_context: "DeployContext",
sync_context: "SyncContext",
physical_id_mapping: Dict[str, str],
stacks: List[Stack],
application_build_result: Optional[ApplicationBuildResult],
):
"""
Parameters
----------
function_identifier : str
ZIP function resource identifier that need to be synced.
build_context : BuildContext
BuildContext
deploy_context : DeployContext
DeployContext
sync_context: SyncContext
SyncContext object that obtains sync information.
physical_id_mapping : Dict[str, str]
Physical ID Mapping
stacks : Optional[List[Stack]]
Stacks
application_build_result: Optional[ApplicationBuildResult]
Pre-build ApplicationBuildResult which can be re-used during SyncFlows
"""
super().__init__(
function_identifier,
build_context,
deploy_context,
sync_context,
physical_id_mapping,
stacks,
application_build_result,
)
self._s3_client = None
self._artifact_folder = None
self._zip_file = None
self._build_graph = None
self._color = Colored()
def set_up(self) -> None:
super().set_up()
self._s3_client = self._boto_client("s3")
def gather_resources(self) -> None:
"""Build function and ZIP it into a temp file in self._zip_file"""
if self._application_build_result:
LOG.debug("Using pre-built resources for function %s", self._function_identifier)
self._use_prebuilt_resources(self._application_build_result)
else:
LOG.debug("Building function from scratch %s", self._function_identifier)
self._build_resources_from_scratch()
zip_file_path = os.path.join(tempfile.gettempdir(), "data-" + uuid.uuid4().hex)
self._zip_file = make_zip_with_lambda_permissions(zip_file_path, self._artifact_folder)
LOG.debug("%sCreated artifact ZIP file: %s", self.log_prefix, self._zip_file)
self._local_sha = file_checksum(cast(str, self._zip_file), hashlib.sha256())
def _use_prebuilt_resources(self, application_build_result: ApplicationBuildResult) -> None:
"""Uses pre-built artifacts and assigns build_graph and artifacts_folder"""
self._build_graph = application_build_result.build_graph
self._artifact_folder = application_build_result.artifacts.get(self._function_identifier)
def _build_resources_from_scratch(self) -> None:
"""Builds function from scratch and assigns build_graph and artifacts_folder"""
with ExitStack() as exit_stack:
if self.has_locks():
exit_stack.enter_context(self._get_lock_chain())
rmtree_if_exists(self._function.get_build_dir(self._build_context.build_dir))
builder = ApplicationBuilder(
self._build_context.collect_build_resources(self._function_identifier),
self._build_context.build_dir,
self._build_context.base_dir,
self._build_context.cache_dir,
cached=True,
is_building_specific_resource=True,
manifest_path_override=self._build_context.manifest_path_override,
container_manager=self._build_context.container_manager,
mode=self._build_context.mode,
combine_dependencies=self._combine_dependencies(),
build_in_source=self._build_context.build_in_source,
)
LOG.debug("%sBuilding Function", self.log_prefix)
build_result = builder.build()
self._build_graph = build_result.build_graph
self._artifact_folder = build_result.artifacts.get(self._function_identifier)
def compare_remote(self) -> bool:
remote_info = self._lambda_client.get_function(FunctionName=self.get_physical_id(self._function_identifier))
remote_sha = base64.b64decode(remote_info["Configuration"]["CodeSha256"]).hex()
LOG.debug("%sLocal SHA: %s Remote SHA: %s", self.log_prefix, self._local_sha, remote_sha)
return self._local_sha == remote_sha
def sync(self) -> None:
if not self._zip_file:
LOG.debug("%sSkipping Sync. ZIP file is None.", self.log_prefix)
return
zip_file_size = os.path.getsize(self._zip_file)
if zip_file_size < MAXIMUM_FUNCTION_ZIP_SIZE:
# Direct upload through Lambda API
LOG.debug("%sUploading Function Directly", self.log_prefix)
with open(self._zip_file, "rb") as zip_file:
data = zip_file.read()
with ExitStack() as exit_stack:
if self.has_locks():
exit_stack.enter_context(self._get_lock_chain())
self._lambda_client.update_function_code(
FunctionName=self.get_physical_id(self._function_identifier), ZipFile=data
)
# We need to wait for the cloud side update to finish
# Otherwise even if the call is finished and lockchain is released
# It is still possible that we have a race condition on cloud updating the same function
wait_for_function_update_complete(
self._lambda_client, self.get_physical_id(self._function_identifier)
)
else:
# Upload to S3 first for oversized ZIPs
LOG.debug("%sUploading Function Through S3", self.log_prefix)
uploader = S3Uploader(
s3_client=self._s3_client,
bucket_name=self._deploy_context.s3_bucket,
prefix=self._deploy_context.s3_prefix,
kms_key_id=self._deploy_context.kms_key_id,
force_upload=True,
no_progressbar=True,
)
s3_url = uploader.upload_with_dedup(self._zip_file)
s3_key = s3_url[5:].split("/", 1)[1]
with ExitStack() as exit_stack:
if self.has_locks():
exit_stack.enter_context(self._get_lock_chain())
self._lambda_client.update_function_code(
FunctionName=self.get_physical_id(self._function_identifier),
S3Bucket=self._deploy_context.s3_bucket,
S3Key=s3_key,
)
# We need to wait for the cloud side update to finish
# Otherwise even if the call is finished and lockchain is released
# It is still possible that we have a race condition on cloud updating the same function
wait_for_function_update_complete(self._lambda_client, self.get_physical_id(self._function_identifier))
if os.path.exists(self._zip_file):
os.remove(self._zip_file)
def _get_resource_api_calls(self) -> List[ResourceAPICall]:
resource_calls = list()
resource_calls.extend(self._get_layers_api_calls())
resource_calls.extend(self._get_codeuri_api_calls())
resource_calls.extend(self._get_function_api_calls())
return resource_calls
def _get_layers_api_calls(self) -> List[ResourceAPICall]:
layer_api_calls = list()
for layer in self._function.layers:
layer_api_calls.append(ResourceAPICall(layer.full_path, [ApiCallTypes.BUILD]))
return layer_api_calls
def _get_codeuri_api_calls(self) -> List[ResourceAPICall]:
codeuri_api_call = list()
if self._function.codeuri:
codeuri_api_call.append(ResourceAPICall(self._function.codeuri, [ApiCallTypes.BUILD]))
return codeuri_api_call
def _get_function_api_calls(self) -> List[ResourceAPICall]:
# We need to acquire lock for both API calls since they would conflict on cloud
# Any UPDATE_FUNCTION_CODE and UPDATE_FUNCTION_CONFIGURATION on the same function
# Cannot take place in parallel
return [
ResourceAPICall(
self._function_identifier,
[ApiCallTypes.UPDATE_FUNCTION_CODE, ApiCallTypes.UPDATE_FUNCTION_CONFIGURATION],
)
]
@staticmethod
def _combine_dependencies() -> bool:
return True
class ZipFunctionSyncFlowSkipBuildZipFile(ZipFunctionSyncFlow):
"""
Alternative implementation for ZipFunctionSyncFlow, which uses pre-built zip file for running sync flow
"""
def gather_resources(self) -> None:
self._zip_file = os.path.join(tempfile.gettempdir(), f"data-{uuid.uuid4().hex}")
shutil.copy2(cast(str, self._function.codeuri), self._zip_file)
LOG.debug("%sCreated artifact ZIP file: %s", self.log_prefix, self._zip_file)
self._local_sha = file_checksum(self._zip_file, hashlib.sha256())
class ZipFunctionSyncFlowSkipBuildDirectory(ZipFunctionSyncFlow):
"""
Alternative implementation for ZipFunctionSyncFlow, which doesn't build function but zips folder directly
since function is annotated with SkipBuild inside its Metadata
"""
def gather_resources(self) -> None:
zip_file_path = os.path.join(tempfile.gettempdir(), f"data-{uuid.uuid4().hex}")
self._zip_file = make_zip_with_lambda_permissions(zip_file_path, self._function.codeuri)
LOG.debug("%sCreated artifact ZIP file: %s", self.log_prefix, self._zip_file)
self._local_sha = file_checksum(cast(str, self._zip_file), hashlib.sha256())