def _add_storage_security_group()

in cli/src/pcluster/templates/cluster_stack.py [0:0]


    def _add_storage_security_group(self, storage_cfn_id, storage):
        storage_type = storage.shared_storage_type
        storage_security_group = ec2.CfnSecurityGroup(
            self.stack,
            "{0}SecurityGroup".format(storage_cfn_id),
            group_description=f"Allow access to {storage_type} file system {storage_cfn_id}",
            vpc_id=self.config.vpc_id,
        )
        rules = []
        storage_deletion_policy = convert_deletion_policy(storage.deletion_policy)
        storage_security_group.cfn_options.deletion_policy = (
            storage_security_group.cfn_options.update_replace_policy
        ) = storage_deletion_policy

        target_security_groups = {
            "Head": self._get_head_node_security_groups(),
            "Compute": self._get_compute_security_groups(),
            "Login": self._get_login_security_groups(),
            "Storage": [storage_security_group.ref],
        }

        for sg_type, sg_refs in target_security_groups.items():
            for sg_ref_id, sg_ref in enumerate(sg_refs):
                # TODO Scope down ingress rules to allow only traffic on the strictly necessary ports.
                #      Currently scoped down only on Login nodes to limit blast radius.
                ingress_protocol = "-1"
                ingress_port = ALL_PORTS_RANGE
                if sg_type == "Login":
                    if storage_type == SharedStorageType.EFS:
                        ingress_protocol = "tcp"
                        ingress_port = EFS_PORT
                    elif storage_type == SharedStorageType.FSX:
                        ingress_protocol = "tcp"
                        ingress_port = FSX_PORTS[LUSTRE]["tcp"][0]
                ingress_rule = self._allow_all_ingress(
                    description=f"{storage_cfn_id}SecurityGroup{sg_type}Ingress{sg_ref_id}",
                    source_security_group_id=sg_ref,
                    group_id=storage_security_group.ref,
                    ip_protocol=ingress_protocol,
                    port=ingress_port,
                )
                rules.append(ingress_rule)

                egress_rule = self._allow_all_egress(
                    description=f"{storage_cfn_id}SecurityGroup{sg_type}Egress{sg_ref_id}",
                    destination_security_group_id=sg_ref,
                    group_id=storage_security_group.ref,
                )
                rules.append(egress_rule)

                if sg_type == "Storage":
                    ingress_rule.cfn_options.deletion_policy = ingress_rule.cfn_options.update_replace_policy = (
                        storage_deletion_policy
                    )
                    egress_rule.cfn_options.deletion_policy = egress_rule.cfn_options.update_replace_policy = (
                        storage_deletion_policy
                    )

        return storage_security_group, rules