cli/src/pcluster/templates/slurm_builder.py (269 lines of code) (raw):
# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance
# with the License. A copy of the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and
# limitations under the License.
from collections import namedtuple
from typing import Dict
from aws_cdk import aws_dynamodb as dynamodb
from aws_cdk import aws_iam as iam
from aws_cdk import aws_lambda as awslambda
from aws_cdk import aws_route53 as route53
from aws_cdk.core import CfnCustomResource, CfnDeletionPolicy, CfnOutput, CfnParameter, Construct, Fn, Stack
from pcluster.aws.aws_api import AWSApi
from pcluster.config.cluster_config import SlurmClusterConfig
from pcluster.constants import PCLUSTER_SLURM_DYNAMODB_PREFIX
from pcluster.models.s3_bucket import S3Bucket
from pcluster.templates.cdk_builder_utils import (
PclusterLambdaConstruct,
add_cluster_iam_resource_prefix,
add_lambda_cfn_role,
create_hash_suffix,
get_cloud_watch_logs_policy_statement,
get_lambda_log_group_prefix,
)
CustomDns = namedtuple("CustomDns", ["ref", "name"])
class SlurmConstruct(Construct):
"""Create the resources required when using Slurm as a scheduler."""
def __init__(
self,
scope: Construct,
id: str,
stack_name: str,
cluster_config: SlurmClusterConfig,
bucket: S3Bucket,
managed_head_node_instance_role: iam.CfnRole,
cleanup_lambda_role: iam.CfnRole,
cleanup_lambda: awslambda.CfnFunction,
):
super().__init__(scope, id)
self.stack_scope = scope
self.stack_name = stack_name
self.config = cluster_config
self.bucket = bucket
self.cleanup_lambda_role = cleanup_lambda_role
self.cleanup_lambda = cleanup_lambda
self.managed_head_node_instance_role = managed_head_node_instance_role
self._add_parameters()
self._add_resources()
# -- Utility methods --------------------------------------------------------------------------------------------- #
@property
def _stack_region(self):
return Stack.of(self).region
@property
def _stack_account(self):
return Stack.of(self).account
def _stack_unique_id(self):
return Fn.select(2, Fn.split("/", Stack.of(self).stack_id))
def _format_arn(self, **kwargs):
return Stack.of(self).format_arn(**kwargs)
# -- Parameters -------------------------------------------------------------------------------------------------- #
def _add_parameters(self):
if self._condition_custom_cluster_dns():
domain_name = AWSApi.instance().route53.get_hosted_zone_domain_name(
self.config.scheduling.settings.dns.hosted_zone_id
)
else:
domain_name = "pcluster."
cluster_dns_domain = f"{self.stack_name.lower()}.{domain_name}"
self.cluster_dns_domain = CfnParameter(
self.stack_scope,
"ClusterDNSDomain",
description="DNS Domain of the private hosted zone created within the cluster",
default=cluster_dns_domain,
)
# -- Resources --------------------------------------------------------------------------------------------------- #
def _add_resources(self):
# DynamoDB to store cluster states
self._add_dynamodb_table()
if self.managed_head_node_instance_role:
self._add_policies_to_head_node_role("HeadNode", self.managed_head_node_instance_role.ref)
self.cluster_hosted_zone = None
if not self._condition_disable_cluster_dns():
self.cluster_hosted_zone = self._add_private_hosted_zone()
def register_policies_with_role(self, scope, managed_compute_instance_roles: Dict[str, iam.CfnRole]):
"""
Associate the Slurm Policies to the compute node roles.
The Slurm Policies specify permissions for accessing Route53 and DynamoDB resources.
"""
for queue_name, role in managed_compute_instance_roles.items():
if role:
self._add_policies_to_compute_node_role(scope, queue_name, role.ref)
def _add_policies_to_compute_node_role(self, scope, node_name, role):
suffix = create_hash_suffix(node_name)
_, policy_name = add_cluster_iam_resource_prefix(
self.config.cluster_name, self.config, "parallelcluster-slurm-compute", iam_type="AWS::IAM::Policy"
)
policy_statements = [
{
"sid": "SlurmDynamoDBTableQuery",
"effect": iam.Effect.ALLOW,
"actions": ["dynamodb:Query"],
"resources": [
self._format_arn(
service="dynamodb", resource=f"table/{PCLUSTER_SLURM_DYNAMODB_PREFIX}{self.stack_name}"
),
self._format_arn(
service="dynamodb",
resource=f"table/{PCLUSTER_SLURM_DYNAMODB_PREFIX}{self.stack_name}/index/*",
),
],
},
]
iam.CfnPolicy(
scope,
f"SlurmPolicies{suffix}",
policy_name=policy_name or "parallelcluster-slurm-compute",
policy_document=iam.PolicyDocument(
statements=[iam.PolicyStatement(**statement) for statement in policy_statements]
),
roles=[role],
)
def _add_policies_to_head_node_role(self, node_name, role):
suffix = create_hash_suffix(node_name)
_, policy_name = add_cluster_iam_resource_prefix(
self.config.cluster_name, self.config, "parallelcluster-slurm-head-node", iam_type="AWS::IAM::Policy"
)
policy_statements = [
{
"sid": "SlurmDynamoDBTable",
"actions": [
"dynamodb:PutItem",
"dynamodb:BatchWriteItem",
"dynamodb:GetItem",
"dynamodb:BatchGetItem",
],
"effect": iam.Effect.ALLOW,
"resources": [
self._format_arn(
service="dynamodb", resource=f"table/{PCLUSTER_SLURM_DYNAMODB_PREFIX}{self.stack_name}"
)
],
},
]
iam.CfnPolicy(
self.stack_scope,
f"SlurmPolicies{suffix}",
policy_name=policy_name or "parallelcluster-slurm-head-node",
policy_document=iam.PolicyDocument(
statements=[iam.PolicyStatement(**statement) for statement in policy_statements]
),
roles=[role],
)
def _add_dynamodb_table(self):
table = dynamodb.CfnTable(
self.stack_scope,
"SlurmDynamoDBTable",
table_name=PCLUSTER_SLURM_DYNAMODB_PREFIX + self.stack_name,
attribute_definitions=[
dynamodb.CfnTable.AttributeDefinitionProperty(attribute_name="Id", attribute_type="S"),
dynamodb.CfnTable.AttributeDefinitionProperty(attribute_name="InstanceId", attribute_type="S"),
],
key_schema=[dynamodb.CfnTable.KeySchemaProperty(attribute_name="Id", key_type="HASH")],
global_secondary_indexes=[
dynamodb.CfnTable.GlobalSecondaryIndexProperty(
index_name="InstanceId",
key_schema=[dynamodb.CfnTable.KeySchemaProperty(attribute_name="InstanceId", key_type="HASH")],
projection=dynamodb.CfnTable.ProjectionProperty(projection_type="ALL"),
)
],
billing_mode="PAY_PER_REQUEST",
)
table.cfn_options.update_replace_policy = CfnDeletionPolicy.RETAIN
table.cfn_options.deletion_policy = CfnDeletionPolicy.DELETE
self.dynamodb_table = table
def _add_private_hosted_zone(self):
if self._condition_custom_cluster_dns():
hosted_zone_id = self.config.scheduling.settings.dns.hosted_zone_id
cluster_hosted_zone = CustomDns(ref=hosted_zone_id, name=self.cluster_dns_domain.value_as_string)
else:
cluster_hosted_zone = route53.CfnHostedZone(
self.stack_scope,
"Route53HostedZone",
name=self.cluster_dns_domain.value_as_string,
vpcs=[route53.CfnHostedZone.VPCProperty(vpc_id=self.config.vpc_id, vpc_region=self._stack_region)],
)
# If Headnode InstanceRole is created by ParallelCluster, add Route53 policy for InstanceRole
if self.managed_head_node_instance_role:
_, policy_name = add_cluster_iam_resource_prefix(
self.config.cluster_name, self.config, "parallelcluster-slurm-route53", iam_type="AWS::IAM::Policy"
)
iam.CfnPolicy(
self.stack_scope,
"ParallelClusterSlurmRoute53Policies",
policy_name=policy_name or "parallelcluster-slurm-route53",
policy_document=iam.PolicyDocument(
statements=[
iam.PolicyStatement(
sid="Route53Add",
effect=iam.Effect.ALLOW,
actions=["route53:ChangeResourceRecordSets"],
resources=[
self._format_arn(
service="route53",
region="",
account="",
resource=f"hostedzone/{cluster_hosted_zone.ref}",
),
],
),
]
),
roles=[self.managed_head_node_instance_role.ref],
)
cleanup_route53_lambda_execution_role = None
if self.cleanup_lambda_role:
cleanup_route53_lambda_execution_role = add_lambda_cfn_role(
scope=self.stack_scope,
config=self.config,
function_id="CleanupRoute53",
statements=[
iam.PolicyStatement(
actions=["route53:ListResourceRecordSets", "route53:ChangeResourceRecordSets"],
effect=iam.Effect.ALLOW,
resources=[
self._format_arn(
service="route53",
region="",
account="",
resource=f"hostedzone/{cluster_hosted_zone.ref}",
),
],
sid="Route53DeletePolicy",
),
get_cloud_watch_logs_policy_statement(
resource=self._format_arn(
service="logs",
account=self._stack_account,
region=self._stack_region,
resource=get_lambda_log_group_prefix("CleanupRoute53-*"),
)
),
],
has_vpc_config=self.config.lambda_functions_vpc_config,
)
cleanup_route53_lambda = PclusterLambdaConstruct(
scope=self.stack_scope,
id="CleanupRoute53FunctionConstruct",
function_id="CleanupRoute53",
bucket=self.bucket,
config=self.config,
execution_role=(
cleanup_route53_lambda_execution_role.attr_arn
if cleanup_route53_lambda_execution_role
else self.config.iam.roles.lambda_functions_role
),
handler_func="cleanup_resources",
).lambda_func
self.cleanup_route53_custom_resource = CfnCustomResource(
self.stack_scope,
"CleanupRoute53CustomResource",
service_token=cleanup_route53_lambda.attr_arn,
)
self.cleanup_route53_custom_resource.add_property_override("ClusterHostedZone", cluster_hosted_zone.ref)
self.cleanup_route53_custom_resource.add_property_override("Action", "DELETE_DNS_RECORDS")
self.cleanup_route53_custom_resource.add_property_override("ClusterDNSDomain", cluster_hosted_zone.name)
CfnOutput(
self.stack_scope,
"ClusterHostedZone",
description="Id of the private hosted zone created within the cluster",
value=cluster_hosted_zone.ref,
)
return cluster_hosted_zone
# -- Conditions -------------------------------------------------------------------------------------------------- #
def _condition_disable_cluster_dns(self):
return (
self.config.scheduling.settings
and self.config.scheduling.settings.dns
and self.config.scheduling.settings.dns.disable_managed_dns
)
def _condition_custom_cluster_dns(self):
return (
self.config.scheduling.settings
and self.config.scheduling.settings.dns
and self.config.scheduling.settings.dns.hosted_zone_id
)