cloudformation/custom_resource/cluster.yaml (361 lines of code) (raw):
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: AWS ParallelCluster Cluster Custom Resource Provider
Parameters:
CustomLambdaRole:
Description: Custom role to use for PC Lambda
Type: String
Default: ''
AdditionalIamPolicies:
Description: Comma-delimited list of additional IAM Policies to add to the cluster (only used if CustomLambdaRole isn't provided).
Type: CommaDelimitedList
Default: ''
CustomBucket:
Description: (Debug only) bucket to retrieve S3 artifacts for internal resources.
Type: String
Default: ''
Mappings:
ParallelCluster:
Constants:
Version: 3.14.0 # major.minor.patch+alpha/beta_identifier
Conditions:
CustomRoleCondition: !Not [!Equals [!Ref CustomLambdaRole, '']]
UsePCPolicies: !Not [!Condition CustomRoleCondition ]
UseAdditionalIamPolicies: !Not [!Equals [!Join ['', !Ref AdditionalIamPolicies ], '']]
UseCustomBucket: !Not [!Equals [!Ref CustomBucket, '']]
Resources:
PclusterLayer:
Type: AWS::Lambda::LayerVersion
Properties:
LayerName: !Sub
- PCLayer-${StackIdSuffix}
- { StackIdSuffix: !Select [2, !Split ['/', !Ref 'AWS::StackId']] }
Description: Library which contains aws-parallelcluster python package and dependencies
Content:
S3Bucket: !If [ UseCustomBucket, !Ref CustomBucket, !Sub "${AWS::Region}-aws-parallelcluster" ]
S3Key: !Sub
- parallelcluster/${Version}/layers/aws-parallelcluster/lambda-layer.zip
- { Version: !FindInMap [ParallelCluster, Constants, Version] }
CompatibleRuntimes:
- python3.12
PclusterPolicies:
Condition: UsePCPolicies
Type: AWS::CloudFormation::Stack
Properties:
TemplateURL: !Sub
- https://${Bucket}.s3.${Region}.${AWS::URLSuffix}/parallelcluster/${Version}/templates/policies/policies.yaml
- { Version: !FindInMap [ParallelCluster, Constants, Version ],
Bucket: !If [UseCustomBucket, !Ref CustomBucket, !Sub "${AWS::Region}-aws-parallelcluster" ],
Region: !Ref AWS::Region }
TimeoutInMinutes: 10
Parameters:
EnableIamAdminAccess: true
PclusterCfnFunctionLogGroup:
Type: AWS::Logs::LogGroup
DeletionPolicy: Retain
Properties:
RetentionInDays: 90
LogGroupName: !Sub /aws/lambda/${PclusterCfnFunction}
EventsPolicy:
Condition: UsePCPolicies
Type: AWS::IAM::ManagedPolicy
Properties:
PolicyDocument:
Version: "2012-10-17"
Statement:
- Sid: EventsPolicy
Effect: Allow
Action:
- events:PutRule
- events:DeleteRule
- events:PutTargets
- events:RemoveTargets
Resource: !Sub arn:${AWS::Partition}:events:${AWS::Region}:${AWS::AccountId}:rule/*
S3Policy:
Condition: UsePCPolicies
Type: AWS::IAM::ManagedPolicy
Properties:
PolicyDocument:
Version: "2012-10-17"
Statement:
- Sid: S3Policy
Effect: Allow
Action:
- s3:*Object
- s3:ListBucket
- s3:ListBucketVersions
Resource:
- !Sub arn:${AWS::Partition}:s3:::${PclusterCustomResourceBucket}/*
- !Sub arn:${AWS::Partition}:s3:::${PclusterCustomResourceBucket}
PclusterCustomResourceBucket:
Type: AWS::S3::Bucket
PclusterLambdaRole:
Condition: UsePCPolicies
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Statement:
- Effect: Allow
Action: sts:AssumeRole
Principal:
Service: lambda.amazonaws.com
ManagedPolicyArns: !Split
- ","
- !Sub
- ${LambdaExecutionPolicy},${ClusterPolicy1},${ClusterPolicy2},${DefaultAdminPolicy},${EventsPolicy},${S3Policy}${AdditionalIamPolicies}
- { LambdaExecutionPolicy: !Sub "arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
ClusterPolicy1: !GetAtt [ PclusterPolicies, Outputs.ParallelClusterClusterPolicy1 ],
ClusterPolicy2: !GetAtt [ PclusterPolicies, Outputs.ParallelClusterClusterPolicy2 ],
DefaultAdminPolicy: !GetAtt [ PclusterPolicies, Outputs.DefaultParallelClusterIamAdminPolicy ],
EventsPolicy: !Ref EventsPolicy,
S3Policy: !Ref S3Policy,
AdditionalIamPolicies: !If [UseAdditionalIamPolicies, !Sub [",${AdditionalPolicies}", AdditionalPolicies: !Join [',', !Ref AdditionalIamPolicies ]], ''] }
PclusterCfnFunction:
Type: AWS::Lambda::Function
Properties:
Tags:
- Key: "parallelcluster:version"
Value: !FindInMap [ParallelCluster, Constants, Version]
- Key: "parallelcluster:custom_resource"
Value: "cluster"
FunctionName: !Sub
- pcluster-cfn-${StackIdSuffix}
- { StackIdSuffix: !Select [2, !Split ['/', !Ref 'AWS::StackId']] }
TracingConfig:
Mode: Active
MemorySize: 2048
Timeout: 60
Code:
ZipFile: !Sub |
import boto3
import cfnresponse
import datetime
import json
import logging
import os
import random
import re
import string
import sys
logger = logging.getLogger()
logger.setLevel(logging.INFO)
import pcluster.api.controllers.cluster_operations_controller
import pcluster.api.errors
import pcluster.utils
from pcluster.api import encoder
from pcluster.cli.exceptions import APIOperationException, ParameterException
from pcluster.api.errors import exception_message, NotFoundException
import pcluster.lib as pc
crhelper_path = "/opt/python/pcluster/resources/custom_resources/custom_resources_code"
sys.path.insert(0, crhelper_path)
from crhelper import CfnResource
helper = CfnResource()
def drop_keys(_dict, keys):
return {k: v for k, v in _dict.items() if k not in keys}
def flatten(obj, ret={}, path=""):
"""flatten a nested map using dot-notation for keys."""
if isinstance(obj, list): # convert list to dictionary for flattening
return flatten({str(i): v for i, v in enumerate(obj)}, ret, path)
for k, v in obj.items():
if isinstance(v, (dict, list)): # recurse on complex objects
flatten(v, ret, f"{path}{k}.")
else: # otherwise add with prefix
ret[path + str(k)] = v
return ret
def update_response(data):
logger.info(data)
# Avoid limit on response object size, user has provided these, so drop them in the response
extra_keys = {"clusterConfiguration", "scheduler", "tags"}
# create / delete responses have cluster information nested in "cluster" key,
# flatten that portion while keeping other keys to propagate warnings.
if "cluster" in data:
helper.Data.update(flatten(drop_keys(data["cluster"], extra_keys)))
validation_messages = json.dumps(data.get("validationMessages", []))
validation_messages = "TRUNCATED:" + validation_messages[:2048] if len(validation_messages) > 2048 else validation_messages
helper.Data["validationMessages"] = validation_messages
else: # without "cluster" in the keys, this is a cluster object.
helper.Data.update(flatten(drop_keys(data, extra_keys)))
def serialize(val):
return utils.to_iso_timestr(val) if isinstance(val, datetime.date) else val
def get_stack_tags(stack_name, overrides):
cfn = boto3.client('cloudformation')
stack_tags = cfn.describe_stacks(StackName=stack_name)["Stacks"][0].get("Tags", [])
return list(filter(lambda t: not (t["Key"].startswith("aws:") or t["Key"] in overrides), stack_tags))
def create_or_update(event):
properties = event["ResourceProperties"]
full_event = json.loads(boto3.client("s3").get_object(Bucket="${PclusterCustomResourceBucket}",Key=properties.get("ClusterName")+"/event.json")["Body"].read())
cluster_configuration = full_event["ResourceProperties"]["ClusterConfiguration"]
request_type = event["RequestType"].upper()
helper.Data["validationMessages"] = "[]" # default value
if properties.get("DeletionPolicy", "Delete") not in {"Retain", "Delete"}:
raise ValueError("DeletionPolicy must be one of [\"Retain\", \"Delete\"].")
if request_type == "CREATE" and "ClusterName" not in properties:
raise ValueError("Couldn't find a ClusterName in the properties.")
elif request_type == "UPDATE" and event["PhysicalResourceId"] != properties.get("ClusterName"):
raise ValueError("Cannot update the ClusterName in the properties.")
cluster_name = properties["ClusterName"]
logger.info(f"{event['RequestType'].upper()}: {cluster_name}")
physical_resource_id = cluster_name
try:
meta_keys = {"ServiceToken", "DeletionPolicy"}
kwargs = {**{pcluster.utils.to_snake_case(k): serialize(v) for k, v in drop_keys(properties, meta_keys).items()}, "wait": False}
kwargs["cluster_configuration"] = cluster_configuration
resource_tags = [{"Key": "parallelcluster:custom_resource", "Value": "cluster"}]
config_tags = cluster_configuration.get("Tags", [])
stack_tags = get_stack_tags(event['StackId'], {t["Key"] for t in config_tags})
kwargs["cluster_configuration"]["Tags"] = stack_tags + config_tags + resource_tags
func = {"CREATE": pc.create_cluster, "UPDATE": pc.update_cluster}[request_type]
update_response(func(**kwargs))
except (APIOperationException, ParameterException, TypeError) as e:
logger.info(str(e))
raise ValueError(str(e))
except Exception as e:
message = pcluster.api.errors.exception_message(e)
# StatusReason is truncated, so skip changeset in output, still logged below
block_list = {"change_set"}
message_data = drop_keys(message.to_dict(), block_list)
logger.info(message_data)
# sort more critical errors last
if "configuration_validation_errors" in message_data and message_data["configuration_validation_errors"]:
order = {k: i for i, k in enumerate(["INFO", "WARNING", "ERROR"])}
message_data["configuration_validation_errors"].sort(key=lambda e: order[e["level"]])
str_msg = encoder.JSONEncoder().encode(message_data)
if not re.search(r"No changes found", str_msg):
logger.info(encoder.JSONEncoder().encode(message))
raise ValueError(str_msg)
logger.info(f"No changes found to update: {cluster_name}")
return physical_resource_id
@helper.create
def create(event, context):
return create_or_update(event)
@helper.update
def update(event, context):
return create_or_update(event)
@helper.delete
def delete(event, context):
properties = event["ResourceProperties"]
cluster_name = properties.get("ClusterName")
boto3.resource('s3').Bucket("${PclusterCustomResourceBucket}").objects.filter(Prefix=f"{cluster_name}/").delete()
deletion_policy = properties.get("DeletionPolicy", "Delete")
if deletion_policy not in {"Retain", "Delete"}:
raise ValueError("DeleetionPolicy must be one of [\"Retain\", \"Delete\"].")
if deletion_policy == "Retain":
return cluster_name
logger.info(f"Deleting: {cluster_name}")
try:
update_response(pc.delete_cluster(cluster_name=cluster_name))
except (ParameterException, NotFoundException): # cluster deleted or invalid name -- ignore here.
pass
except Exception as e:
message = exception_message(e)
raise ValueError(encoder.JSONEncoder().encode(message))
# Polling functionality for async CUD operations
def poll(event):
log_group = os.getenv("AWS_LAMBDA_LOG_GROUP_NAME")
cluster_name = event["ResourceProperties"].get("ClusterName")
try:
cluster = pc.describe_cluster(cluster_name=cluster_name)
status = cluster.get("clusterStatus")
if status in {"CREATE_COMPLETE", "UPDATE_COMPLETE"}:
update_response(cluster)
return cluster_name
elif status in {"CREATE_FAILED", "UPDATE_FAILED", "DELETE_FAILED"}:
reasons = ",".join(f["failureCode"] for f in cluster.get("failures", []))
raise ValueError(f"{cluster_name}: {reasons} (LogGroup: {log_group})")
# If create fails and we try to roll-back (e.g. delete),
# gracefully handle missing cluster. on the delete pathway, the
# only invalid parameter can be the name
except (ParameterException, NotFoundException):
if event["RequestType"].upper() == "DELETE":
# Returning a value here signifies that the delete is completed and we can stop polling
# not returning a value here causes cfn resource helper to keep polling.
return cluster_name
raise ValueError(f"{cluster_name} failed {event['RequestType'].upper()}. See LogGroup: {log_group}")
@helper.poll_create
def poll_create(event, context):
return poll(event)
@helper.poll_update
def poll_update(event, context):
return poll(event)
@helper.poll_delete
def poll_delete(event, context):
return poll(event)
def handler(event, context):
try:
logger.info("Beginning of Pcluster custom resource Lambda function. Printing full event...")
logger.info(event)
if event["ResourceProperties"].get("ClusterConfiguration") or (event.get("OldResourceProperties") and event["OldResourceProperties"].get("ClusterConfiguration")):
boto3.client('s3').put_object(
Body=json.dumps(event),
Bucket="${PclusterCustomResourceBucket}",
Key=event["ResourceProperties"].get("ClusterName")+"/event.json"
)
event["ResourceProperties"].pop("ClusterConfiguration", None)
event.get("OldResourceProperties", {}).pop("ClusterConfiguration", None)
except Exception as exception:
cfnresponse.send(event, context, cfnresponse.FAILED, {}, event.get('PhysicalResourceId', 'PclusterClusterCustomResource'), str(exception))
helper(event, context)
Handler: index.handler
Runtime: python3.12
Role: !If [CustomRoleCondition, !Ref CustomLambdaRole, !GetAtt PclusterLambdaRole.Arn]
Layers:
- !Ref PclusterLayer
CleanupS3bucketFunction:
Type: AWS::Lambda::Function
Properties:
Timeout: 60
Code:
ZipFile: !Sub |
import boto3
import cfnresponse
import logging
import sys
from botocore.config import Config
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def _delete_s3_artifacts(event):
"""
Delete artifacts under the directory that is passed in.
It exits gracefully if directory does not exist.
:param bucket_name: bucket containing cluster artifacts
"""
bucket_name = event["ResourceProperties"]["S3Bucket"]
try:
if bucket_name != "NONE":
bucket = boto3.resource("s3", config=Config(retries={"max_attempts": 60})).Bucket(bucket_name)
logger.info("Cluster S3 artifact in %s deletion: STARTED", bucket_name)
bucket.objects.all().delete()
bucket.object_versions.delete()
logger.info("Cluster S3 artifact in %s deletion: COMPLETED", bucket_name)
except boto3.client("s3").exceptions.NoSuchBucket as ex:
logger.warning("S3 bucket %s not found. Bucket was probably manually deleted.", bucket_name)
logger.warning(ex, exc_info=True)
except Exception as e:
logger.error(
"Failed when deleting cluster S3 artifact in %s with error %s", bucket_name, e
)
raise
def handler(event, context):
try:
if event["RequestType"] == "Delete":
_delete_s3_artifacts(event)
response_status = cfnresponse.SUCCESS
reason = ""
except Exception as e:
response_status = cfnresponse.FAILED
reason = str(e)
cfnresponse.send(event, context, response_status, {}, event.get('PhysicalResourceId', 'CleanupS3bucketCustomResource'), reason)
Handler: index.handler
Runtime: python3.12
Role: !If [CustomRoleCondition, !Ref CustomLambdaRole, !GetAtt PclusterLambdaRole.Arn]
CleanupS3bucketCustomResource:
DeletionPolicy: Delete
Properties:
S3Bucket: !Ref PclusterCustomResourceBucket
ServiceToken: !GetAtt CleanupS3bucketFunction.Arn
Type: AWS::CloudFormation::CustomResource
UpdateReplacePolicy: Delete
Outputs:
ServiceToken:
Description: Lambda for managing PCluster Resources
Value: !GetAtt PclusterCfnFunction.Arn
LogGroupArn:
Description: ARN of LogGroup for Lambda logging
Value: !GetAtt PclusterCfnFunctionLogGroup.Arn
LambdaLayerArn:
Description: ARN for the ParallelCluster Lambda Layer
Value: !Ref PclusterLayer