def validate_sg_rules()

in installer/resources/src/find_existing_resources.py [0:0]


    def validate_sg_rules(self, cfn_params, check_fs=True):
        try:
            # Begin Verify Security Group Rules
            print(f"\n====== Please wait a little as we {fg('misty_rose_3')}validate your security group rules {attr('reset')} ======\n")
            security_groups = [cfn_params["scheduler_sg"], cfn_params["compute_node_sg"]]
            if "vpc_endpoint_sg" in cfn_params:
                security_groups.append(cfn_params["vpc_endpoint_sg"])
            sg_rules = self.get_rules_for_security_group(security_groups)
            if check_fs is True:
                fs_sg = self.get_fs_security_groups(cfn_params)

            if sg_rules["success"] is True:
                scheduler_sg_rules = sg_rules["message"][cfn_params["scheduler_sg"]]
                compute_node_sg_rules = sg_rules["message"][cfn_params["compute_node_sg"]]
                vpc_endpoint_sg_rules = sg_rules["message"].get(cfn_params.get("vpc_endpoint_sg", None), None)
            else:
                print(f"{fg('red')}Error: {sg_rules['message']} {attr('reset')}")
                sys.exit(1)

            errors = {}
            # status == True means that the check passed
            errors["SCHEDULER_SG_IN_COMPUTE"] = {
                    "status": False,
                    "error": f"Compute Node SG must allow all TCP traffic from Scheduler SG",
                    "resolution": f"Add new rule on {cfn_params['compute_node_sg']} that allow TCP ports '0-65535' for {cfn_params['scheduler_sg']}"}
            errors["COMPUTE_SG_IN_SCHEDULER"] = {
                    "status": False,
                    "error": f"Scheduler SG must allow all TCP traffic from Compute Node SG",
                    "resolution": f"Add a new rule on {cfn_params['scheduler_sg']} that allow TCP ports '0-65535' for {cfn_params['compute_node_sg']}"}
            errors["CLIENT_IP_HTTPS_IN_SCHEDULER"] = {
                    "status": False,
                    "error": f"Client IP must be allowed for port 443 (80 optional) on Scheduler SG",
                    "resolution": f"Add two rules on {cfn_params['scheduler_sg']} that allow TCP ports 80 and 443 for {self.client_ip}"}
            errors["CLIENT_IP_SSH_IN_SCHEDULER"] = {
                    "status": False,
                    "error": f"Client IP must be allowed for port 22 (SSH) on Scheduler SG",
                    "resolution": f"Add one rule on {cfn_params['scheduler_sg']} that allow TCP port 22 for {self.client_ip}"}
            errors["SCHEDULER_SG_EQUAL_COMPUTE"] = {
                    "status": False,
                    "error": "Scheduler SG and Compute SG must be different",
                    "resolution": "You must choose two different security groups"}
            errors["COMPUTE_SG_EGRESS_EFA"] = {
                    "status": False,
                    "error": "Compute SG must reference egress traffic to itself for EFA",
                    "resolution": f"Add a new (EGRESS) rule on {cfn_params['compute_node_sg']} that allow TCP ports '0-65535' for {cfn_params['compute_node_sg']}. Make sure you configure EGRESS rule and not INGRESS"}
            if 'vpc_endpoint_sg' in cfn_params:
                errors["COMPUTE_EGRESS_TO_VPC_ENDPOINTS"] = {
                        "status": False,
                        "error": "Compute SG must allow port 443 egress to the vpc endpoints security group",
                        "resolution": f"Add a new (EGRESS) rule on {cfn_params['compute_node_sg']} that allows TCP port '443' for {cfn_params['vpc_endpoint_sg']}. Make sure you configure EGRESS rule and not INGRESS"}
                errors["VPC_ENDPOINTS_INGRESS_FROM_COMPUTE"] = {
                        "status": False,
                        "error": "vpc Endpoints SG must allow port 443 ingress from the Compute SG",
                        "resolution": f"Add a new (INGRESS) rule on {cfn_params['vpc_endpoint_sg']} that allows TCP port '443' from {cfn_params['compute_node_sg']}. Make sure you configure INGRESS rule and not EGRESS"}
                errors["SCHEDULER_EGRESS_TO_VPC_ENDPOINTS"] = {
                        "status": False,
                        "error": "Scheduler SG must allow port 443 egress to the vpc endpoints security group",
                        "resolution": f"Add a new (EGRESS) rule on {cfn_params['scheduler_sg']} that allows TCP port '443' for {cfn_params['vpc_endpoint_sg']}. Make sure you configure EGRESS rule and not INGRESS"}
                errors["VPC_ENDPOINTS_INGRESS_FROM_SCHEDULER"] = {
                        "status": False,
                        "error": "vpc Endpoints SG must allow port 443 ingress from the Scheduler SG",
                        "resolution": f"Add a new (INGRESS) rule on {cfn_params['vpc_endpoint_sg']} that allows TCP port '443' from {cfn_params['scheduler_sg']}. Make sure you configure INGRESS rule and not EGRESS"}

            if check_fs is True:
                errors["FS_APP_SG"] = {
                    "status": False,
                    "error": f"SG assigned to EFS App {cfn_params['fs_apps']} must allow Scheduler SG and Compute SG",
                    "resolution": f"Add {cfn_params['scheduler_sg']} and {cfn_params['compute_node_sg']} on your EFS Apps {cfn_params['fs_apps']}"}

                errors["FS_DATA_SG"] = {
                    "status": False,
                    "error": f"SG assigned to EFS App {cfn_params['fs_data']} must allow Scheduler SG and Compute SG",
                    "resolution": f"Add {cfn_params['scheduler_sg']} and {cfn_params['compute_node_sg']} on your EFS Data {cfn_params['fs_data']}"}

            # Verify Scheduler Rules
            for rules in scheduler_sg_rules:
                if rules["from_port"] == 0 and rules["to_port"] == 65535:
                    for rule in rules["approved_ips"]:
                        if cfn_params['compute_node_sg'] in rule:
                            errors["COMPUTE_SG_IN_SCHEDULER"]["status"] = True

                if rules["from_port"] == 443 or rules["from_port"] == 22:
                    for rule in rules["approved_ips"]:
                        client_ip_netmask = 32
                        if client_ip_netmask == '32':
                            if ipaddress.IPv4Address(self.client_ip) in ipaddress.IPv4Network(rule):
                                if rules["from_port"] == 443:
                                    errors["CLIENT_IP_HTTPS_IN_SCHEDULER"]["status"] = True
                                if rules["from_port"] == 22:
                                    errors["CLIENT_IP_SSH_IN_SCHEDULER"]["status"] = True
                        else:
                            if self.client_ip in rule:
                                if rules["from_port"] == 443:
                                    errors["CLIENT_IP_HTTPS_IN_SCHEDULER"]["status"] = True
                                if rules["from_port"] == 22:
                                    errors["CLIENT_IP_SSH_IN_SCHEDULER"]["status"] = True
            # Verify Compute Node Rules
            for rules in compute_node_sg_rules:
                if rules["from_port"] == 0 and rules["to_port"] == 65535:
                    for rule in rules["approved_ips"]:
                        if cfn_params['scheduler_sg'] in rule:
                            errors["SCHEDULER_SG_IN_COMPUTE"]["status"] = True

                        if rules["type"] == "egress":
                            if cfn_params['compute_node_sg'] in rule:
                                errors["COMPUTE_SG_EGRESS_EFA"]["status"] = True
            # Verify VPC Endpoint Rules
            if 'vpc_endpoint_sg' in cfn_params:
                for rule in compute_node_sg_rules:
                    # Make sure compute node allows egress to vpc endpoints
                    if rule["type"] != "egress":
                        continue
                    for approved_ip in rule["approved_ips"]:
                        if rule["from_port"] <= 443 and rule["to_port"] >= 443:
                            if cfn_params['vpc_endpoint_sg'] in approved_ip:
                                errors["COMPUTE_EGRESS_TO_VPC_ENDPOINTS"]["status"] = True
                for rule in scheduler_sg_rules:
                    # Make sure scheduler allows egress to vpc endpoints
                    if rule["type"] != "egress":
                        continue
                    for approved_ip in rule["approved_ips"]:
                        if rule["from_port"] <= 443 and rule["to_port"] >= 443:
                            if cfn_params['vpc_endpoint_sg'] in approved_ip:
                                errors["SCHEDULER_EGRESS_TO_VPC_ENDPOINTS"]["status"] = True
                for rule in vpc_endpoint_sg_rules:
                    # Make sure endpoints allow ingress from compute nodes and scheduler
                    if rule["type"] != "ingress":
                        continue
                    for approved_ip in rule["approved_ips"]:
                        if rule["from_port"] <= 443 and rule["to_port"] >= 443:
                            if cfn_params['scheduler_sg'] in approved_ip:
                                errors["VPC_ENDPOINTS_INGRESS_FROM_SCHEDULER"]["status"] = True
                            if cfn_params['compute_node_sg'] in approved_ip:
                                errors["VPC_ENDPOINTS_INGRESS_FROM_COMPUTE"]["status"] = True

            if check_fs is True:
                if cfn_params['scheduler_sg'] in fs_sg["message"][cfn_params['fs_apps']] and cfn_params['compute_node_sg'] in fs_sg["message"][cfn_params['fs_apps']]:
                    errors["FS_APP_SG"]["status"] = True

                if cfn_params['scheduler_sg'] in fs_sg["message"][cfn_params['fs_data']] and cfn_params['compute_node_sg'] in fs_sg["message"][cfn_params['fs_data']]:
                    errors["FS_DATA_SG"]["status"] = True

            if cfn_params["scheduler_sg"] != cfn_params["compute_node_sg"]:
                errors["SCHEDULER_SG_EQUAL_COMPUTE"]["status"] = True

            sg_errors = {}

            confirm_sg_settings = False
            for error_id, error_info in errors.items():
                if error_info["status"] is False:
                    if check_fs is False and "EFS" in error_id:
                        pass
                    else:
                        print(f"{fg('yellow')}ATTENTION!! {error_info['error']} {attr('reset')}\nHow to solve: {error_info['resolution']}\n")
                        sg_errors[error_info["error"]] = error_info["resolution"]
                        confirm_sg_settings = True

            if confirm_sg_settings:
                choice = get_input("Your security groups may not be configured correctly. Verify them and determine if the warnings listed above are false-positive.\n Do you still want to continue with the installation?",
                                   None, ["yes", "no"], str)
                if choice.lower() == "no":
                    sys.exit(1)
            else:
                print(f"{fg('green')} Security Groups seem to be configured correctly{attr('reset')}")

            return {"success": True,
                    "message": ""}

        except Exception as e:
            exc_type, exc_obj, exc_tb = sys.exc_info()
            fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
            print(f"{exc_type} {fname} {exc_tb.tb_lineno}")
            return {"success": False, "message": f"{exc_type} {fname} {exc_tb.tb_lineno}"}