in source/cdk_solution_helper_py/helpers_cdk/aws_solutions/cdk/synthesizers.py [0:0]
def patch_lambda(self):
"""Patch the lambda functions for S3 deployment compatibility"""
for (resource_name, resource) in self.contents.get("Resources", {}).items():
resource_type = resource.get("Type")
if (
resource_type == "AWS::Lambda::Function"
or resource_type == "AWS::Lambda::LayerVersion"
):
logger.info(f"{resource_name} ({resource_type}) patching")
# the key for S3Key for AWS::Lambda:LayerVersion is under "Content".
# the key for S3Key FOR AWS::Lambda::Function is under "Code"
content_key = (
"Content"
if resource_type == "AWS::Lambda::LayerVersion"
else "Code"
)
try:
resource_id = resource["Properties"][content_key]["S3Key"].split(
"."
)[0]
except KeyError:
logger.warning(
"found resource without an S3Key (this typically occurs when using inline code or during test)"
)
continue
asset = self.assets["files"][resource_id]
asset_source_path = self.path.parent.joinpath(asset["source"]["path"])
asset_packaging = asset["source"]["packaging"]
# CDK does not zip assets prior to deployment - we do it here if a zip asset is detected
if asset_packaging == "zip":
# create archive if necessary
logger.info(f"{resource_name} packaging into .zip file")
archive = shutil.make_archive(
base_name=asset_source_path,
format="zip",
root_dir=str(asset_source_path),
)
elif asset_packaging == "file":
archive = self.cloud_assembly_path.joinpath(asset["source"]["path"])
else:
raise ValueError(
f"Unsupported asset packaging format: {asset_packaging}"
)
# rename archive to match the resource name it was generated for
archive_name = f"{resource_name}.zip"
archive_path = self.cloud_assembly_path.joinpath(archive_name)
shutil.move(src=archive, dst=archive_path)
# update CloudFormation resource properties for S3Bucket and S3Key
# fmt: off
resource["Properties"][content_key]["S3Bucket"] = {
"Fn::Join": [ # NOSONAR (python:S1192) - string for clarity
"-",
[
{
"Fn::FindInMap": ["SourceCode", "General", "S3Bucket"] # NOSONAR (python:S1192) - string for clarity
},
{"Ref": "AWS::Region"},
],
]
}
resource["Properties"][content_key]["S3Key"] = {
"Fn::Join": [ # NOSONAR (python:S1192) - string for clarity
"/",
[
{
"Fn::FindInMap": ["SourceCode", "General", "KeyPrefix"] # NOSONAR (python:S1192) - string for clarity
},
archive_name,
],
]
}
# fmt: on
# add resource to the list of regional assets
self.assets_regional.append(archive_path)