def check_config()

in source/soca/cluster_manager/add_nodes.py [0:0]


def check_config(**kwargs):
    error = False
    # Convert str to bool when possible
    for k, v in kwargs.items():
        if str(v).lower() in ['true', 'yes', 'y', 'on']:
            kwargs[k] = True
        if str(v).lower() in ['false', 'no', 'n', 'off']:
            kwargs[k] = False

    # Transform instance_type as list in case multiple type are specified
    kwargs['instance_type'] = kwargs['instance_type'].split("+")

    # Transform weighted_capacity as a list in case multiple instance types are specified
    # Confirm that the length of weighted_capacity list is consistent with the length of instance_type list
    weighted_capacity=[]
    if kwargs['weighted_capacity'] is not False:
        for item in kwargs['weighted_capacity'].split("+"):
            try:
                item=int(item)
            except ValueError:
                error = return_message('All values specified for --weighted_capacity must be integers. Found: ' + str(item))
            weighted_capacity.append(item)
        kwargs['weighted_capacity'] = weighted_capacity
        if len(kwargs['instance_type']) != len(kwargs['weighted_capacity']):
            error = return_message('Number of --weighted_capacity entries is not consistent with --instance_type entries')

    # Validate terminate_when_idle
    if 'terminate_when_idle' not in kwargs.keys():
        kwargs['terminate_when_idle'] = 0

    # Must convert true,True into bool() and false/False
    if kwargs['job_id'] is None and kwargs['keep_forever'] is False and int(kwargs['terminate_when_idle']) == 0:
        error = return_message('--job_id, --keep_forever True, or --terminate_when_idle N>0 must be specified')

    # Ensure jobId is not None when using keep_forever
    if kwargs['job_id'] is None and (kwargs['keep_forever'] is True or int(kwargs['terminate_when_idle']) > 0):
        kwargs['job_id'] = kwargs['stack_uuid']

    # Ensure anonymous metric is either True or False.
    if kwargs['anonymous_metrics'] not in [True, False]:
        kwargs['anonymous_metrics'] = True

    # Ensure force_ri is either True or False.
    if kwargs['force_ri'] not in [True, False]:
        kwargs['force_ri'] = False

    if kwargs['force_ri'] is True and kwargs['spot_price'] is False:
        # Job can only run on Reserved Instance. We ignore if SpotFleet is enabled
        try:
            instance_type_info
        except NameError:
            instance_type_info = {}

        for instance_type in kwargs["instance_type"]:
            check_ri = verify_ri_saving_availabilities(instance_type, instance_type_info)
            if (check_ri[instance_type]["current_instance_in_use"] + int(kwargs['desired_capacity'])) > check_ri[instance_type]["current_ri_purchased"]:
                error = return_message("Not enough RI to cover for this job. Instance type: {}, number of running instances: {}, number of purchased RIs: {}, capacity requested: {}. Either purchase more RI or allow usage of On Demand".format(
                            instance_type,
                            check_ri[instance_type]["current_instance_in_use"],
                            check_ri[instance_type]["current_ri_purchased"],
                            kwargs['desired_capacity']))
            else:
                # Update the number of current_instance_in_use with the number of new instance that this job will launch
                instance_type_info[instance_type] = {'current_instance_in_use': check_ri[instance_type]["current_instance_in_use"] + int(kwargs['desired_capacity'])}

    # Default System metrics to False unless explicitly set to True
    if kwargs['system_metrics'] is not True:
        kwargs['system_metrics'] = False

    if not isinstance(int(kwargs['desired_capacity']), int):
        return_message('Desired Capacity must be an int')

    if not 'tags' in kwargs.keys() or kwargs['tags'] is None:
        kwargs['tags'] = {}
    else:
        try:
            kwargs['tags'] = ast.literal_eval(kwargs['tags'])
            if not isinstance(kwargs['tags'], dict):
                error = return_message('Tags must be a valid dictionary')
        except ValueError:
            error = return_message('Tags must be a valid dictionary')

    # FSx Management
    kwargs['fsx_lustre_configuration'] = {
        'fsx_lustre': kwargs['fsx_lustre'],
        's3_backend': False,
        'existing_fsx': False,
        'import_path': False,
        'export_path': False,
        'deployment_type':  kwargs['fsx_lustre_deployment_type'],
        'per_unit_throughput': False,
        'capacity': 1200
    }

    if kwargs['fsx_lustre'] is not False:
        fsx_deployment_type_allowed = ["scratch_1", "scratch_2", "persistent_1"]
        fsx_lustre_per_unit_throughput_allowed = [50, 100, 200]

        # Default to SCRATCH_2 if incorrect value is specified
        if kwargs["fsx_lustre_deployment_type"].lower() not in fsx_deployment_type_allowed:
                return_message('FSx Deployment Type must be: ' + ",".join(fsx_deployment_type_allowed))
        else:
            kwargs["fsx_lustre_configuration"]["fsx_lustre_deployment_type"] = kwargs["fsx_lustre_deployment_type"].upper()

        # If deployment_type is PERSISTENT, configure Per unit throughput and default to 200mb/s
        if kwargs["fsx_lustre_configuration"]["fsx_lustre_deployment_type"].lower() == "persistent_1":
            if not isinstance(int(kwargs['fsx_lustre_per_unit_throughput']), int):
                return_message('FSx Per Unit Throughput must be an int')
            else:
                if kwargs["fsx_lustre_per_unit_throughput"] not in fsx_lustre_per_unit_throughput_allowed:
                    return_message('FSx Deployment Type must be: ' + ",".join(fsx_lustre_per_unit_throughput_allowed))
                else:
                    kwargs["fsx_lustre_configuration"]["per_unit_throughput"] = int(kwargs["fsx_lustre_per_unit_throughput"])

        if kwargs['fsx_lustre'] is not True:
            # when fsx_lustre is set to True, only create a FSx without S3 backend
            if kwargs['fsx_lustre'].startswith("fs-"):
                kwargs['fsx_lustre_configuration']['existing_fsx'] = kwargs['fsx_lustre']
            else:
                if kwargs['fsx_lustre'].startswith("s3://"):
                    kwargs['fsx_lustre_configuration']['s3_backend'] = kwargs['fsx_lustre']
                else:
                    kwargs['fsx_lustre_configuration']['s3_backend'] = "s3://" + kwargs['fsx_lustre']

                # Verify if SOCA has permissions to access S3 backend
                try:
                    s3.get_bucket_acl(Bucket=kwargs['fsx_lustre'].split('s3://')[-1])
                except exceptions.ClientError:
                    error = return_message('SOCA does not have access to this bucket (' + kwargs['fsx_lustre'] + '). Refer to the documentation to update IAM policy.')

                # Verify if user specified custom Import/Export path.
                # Syntax is fsx_lustre=<bucket>+<export_path>+<import_path>
                check_user_specified_path = kwargs['fsx_lustre'].split('+')
                if check_user_specified_path.__len__() == 1:
                    pass
                elif check_user_specified_path.__len__() == 2:
                    # import path default to bucket root if not specified
                    kwargs['fsx_lustre_configuration']['export_path'] = check_user_specified_path[1]
                    kwargs['fsx_lustre_configuration']['import_path'] = kwargs['fsx_lustre_configuration']['s3_backend']
                elif check_user_specified_path.__len__() == 3:
                    # When customers specified both import and export path
                    kwargs['fsx_lustre_configuration']['export_path'] = check_user_specified_path[1]
                    kwargs['fsx_lustre_configuration']['import_path'] = check_user_specified_path[2]
                else:
                    error = return_message('Error setting up Import/Export path: ' + kwargs['fsx_lustre'] + '). Syntax is <bucket_name>+<export_path>+<import_path>. If import_path is not specified it defaults to bucket root level')

        if kwargs['fsx_lustre_size'] is not False:
            fsx_lustre_capacity_allowed = [1200, 2400, 3600, 7200, 10800]
            if int(kwargs['fsx_lustre_size']) not in fsx_lustre_capacity_allowed:
                error = return_message('fsx_lustre_size must be: ' + ','.join(str(x) for x in fsx_lustre_capacity_allowed))
            else:
                kwargs['fsx_lustre_configuration']['capacity'] = kwargs['fsx_lustre_size']

    SpotFleet = True if (kwargs['spot_price'] is not False and kwargs['spot_allocation_count'] is False and (int(kwargs['desired_capacity']) > 1 or kwargs['instance_type'].__len__() > 1)) else False

    if kwargs['subnet_id'] is False:
        if SpotFleet is True:
            kwargs['subnet_id'] = soca_configuration["PrivateSubnets"]
        else:
            kwargs['subnet_id'] = [random.choice(soca_configuration["PrivateSubnets"])]
    else:
        if isinstance(kwargs['subnet_id'], int):
            if kwargs['subnet_id'] == 2:
                kwargs['subnet_id'] = random.sample(soca_configuration["PrivateSubnets"], 2)
            elif kwargs['subnet_id'] == 3:
                kwargs['subnet_id'] = random.sample(soca_configuration["PrivateSubnets"], 3)
            else:
                error = return_message('Approved value for subnet_id are either the actual subnet ID or 2 or 3')
        else:
            kwargs['subnet_id'] = kwargs['subnet_id'].split('+')
            for subnet in kwargs['subnet_id']:
                if subnet not in soca_configuration["PrivateSubnets"]:
                    error = return_message('Incorrect subnet_id. Must be one of ' + ','.join(soca_configuration["PrivateSubnets"]))

    # Handle placement group logic
    if 'placement_group' not in kwargs.keys():
        pg_user_defined = False
        # Default PG to True if not present
        kwargs['placement_group'] = True if SpotFleet is False else False
    else:
        pg_user_defined = True
        if kwargs['placement_group'] not in [True, False]:
            kwargs['placement_group'] = False
            error = return_message('Incorrect placement_group. Must be True or False')

    if int(kwargs['desired_capacity']) > 1:
        if kwargs['subnet_id'].__len__() > 1 and pg_user_defined is True and kwargs['placement_group'] is True and SpotFleet is False:
            # more than 1 subnet specified but placement group is also configured, default to the first subnet and enable PG
            kwargs['subnet_id'] = [kwargs['subnet_id'][0]]
        else:
            if kwargs['subnet_id'].__len__() > 1 and pg_user_defined is False:
                kwargs['placement_group'] = False
    else:
        if int(kwargs['desired_capacity']) == 1:
            kwargs['placement_group'] = False
        else:
            # default to user specified value
            pass

    if kwargs['subnet_id'].__len__() > 1:
        if kwargs['placement_group'] is True and SpotFleet is False:
            # if placement group is True and more than 1 subnet is defined, force default to 1 subnet
            kwargs['subnet_id'] = [kwargs['subnet_id'][0]]

    # Validate additional security group ids
    if kwargs['security_groups']:
        sgs_id = kwargs['security_groups'].split("+")
        if sgs_id.__len__() > 4:
            error = return_message("You can only specify a maximum of 4 additional security groups")
        try:
            ec2.describe_security_groups(GroupIds=sgs_id)['SecurityGroups']
            kwargs['security_groups'] = sgs_id
        except Exception as err:
            error = return_message(f'Unable to validate one SG from {sgs_id} due to {err}')

    # Validate custom IAM Instance Profile
    if kwargs['instance_profile']:
        try:
            kwargs['instance_profile'] = iam.get_instance_profile(InstanceProfileName=kwargs['instance_profile'])["InstanceProfile"]["Arn"]
        except Exception as err:
            error = return_message(f"Unable to validate custom IAM instance profile {kwargs['instance_profile']} due to {err}")

    # Check core_count and ht_support
    try:
        instance_attributes = ec2.describe_instance_types(InstanceTypes=[kwargs['instance_type'][0]])
        if len(instance_attributes['InstanceTypes']) == 0:
            error = return_message('Unable to check instance: ' + kwargs['instance_type'][0])
        else:
            # boto3 does not return Default Cores/ThreadsPerCore T2 instances does not have DefaultCores
            if kwargs['instance_type'][0] in ["t2.micro", "t2.nano", "t2.small"]:
                kwargs['ht_support'] = False
            elif kwargs['instance_type'][0] in ["t2.medium", "t2.large", "t2.xlarge", "t2.2xlarge"]:
                # do not set ht_support. Will default to None if not explicitly set by the user
                pass
            else:
                kwargs['core_count'] = instance_attributes['InstanceTypes'][0]['VCpuInfo']['DefaultCores']
                if instance_attributes['InstanceTypes'][0]['VCpuInfo']['DefaultThreadsPerCore'] == 1:
                    # Set ht_support to False for instances with DefaultThreadsPerCore = 1 (e.g. graviton)
                    kwargs['ht_support'] = False
    except ClientError as e:
        if e.response['Error'].get('Code') == 'InvalidInstanceType':
            error = return_message('InvalidInstanceType: ' + kwargs['instance_type'][0])
        else:
            error = return_message('Unable to check instance: ' + kwargs['instance_type'][0])

    # Validate Spot Allocation Strategy
    mapping = {
        "lowest-price":
            {
                "ASG": "lowest-price",
                "SpotFleet": "lowestPrice",
                "accepted_values": ["lowest-price", "lowestprice"]
            },
        "diversified":
            {
                "ASG": "capacity-optimized",
                "SpotFleet": "diversified",
                "accepted_values": ["diversified"]
            },
        "capacity-optimized":
            {
                "ASG": "capacity-optimized",
                "SpotFleet": "capacityOptimized",
                "accepted_values": ["capacityoptimized", "capacity-optimized", "optimized"]
            }
    }

    if kwargs['spot_allocation_strategy'] is not False:
        for k, v in mapping.items():
            if kwargs['spot_allocation_strategy'].lower() in v["accepted_values"]:
                if SpotFleet is True:
                    kwargs['spot_allocation_strategy'] = v["SpotFleet"]
                    break
                else:
                    kwargs['spot_allocation_strategy'] = v["ASG"]
                    break
        spot_allocation_strategy_allowed = ['lowestPrice', 'lowest-price', 'diversified', 'capacityOptimized', 'capacity-optimized']
        if kwargs['spot_allocation_strategy'] not in spot_allocation_strategy_allowed:
            error = return_message('spot_allocation_strategy_allowed (' + str(kwargs['spot_allocation_strategy']) + ') must be one of the following value: ' + ', '.join(spot_allocation_strategy_allowed))
    else:
        kwargs['spot_allocation_strategy'] = 'capacityOptimized' if SpotFleet is True else 'capacity-optimized'

    # Validate Spot Allocation Percentage
    if kwargs['spot_allocation_count'] is not False:
        if isinstance(kwargs['spot_allocation_count'], int):
            if int(kwargs['spot_allocation_count']) > kwargs['desired_capacity']:
                error = return_message('spot_allocation_count (' + str(kwargs['spot_allocation_count']) + ') must be an lower or equal to the number of nodes provisioned for this simulation (' + str(kwargs['desired_capacity']) + ')')
        else:
            error = return_message('spot_allocation_count (' + str(kwargs['spot_allocation_count']) + ') must be an integer')

    # Validate ht_support
    if kwargs['ht_support'] is None:
        kwargs['ht_support'] = False
    else:
        if kwargs['ht_support'] not in [True, False]:
            error = return_message('ht_support (' + str(kwargs['ht_support']) + ') must be either True or False')

    # Validate Base OS
    if kwargs['base_os'] is not False:
        base_os_allowed = ['rhel7', 'centos7', 'amazonlinux2']
        if kwargs['base_os'] not in base_os_allowed:
            error = return_message('base_os (' + str(kwargs['base_os']) + ') must be one of the following value: ' + ','.join(base_os_allowed))
    else:
        kwargs['base_os'] = soca_configuration['BaseOS']

    # Validate Spot Price
    if kwargs['spot_price'] is not False:
        if kwargs['spot_price'] == 'auto' or isinstance(kwargs['spot_price'], float):
            pass
        else:
            error = return_message('spot_price must be either "auto" or a float value"')

    # Validate EFA
    try:
        if kwargs['efa_support'] not in [True, False]:
            kwargs['efa_support'] = False
        else:
            if kwargs['efa_support'] is True:
                for instance_type in kwargs['instance_type']:
                    check_efa_support = ec2.describe_instance_types(
                        InstanceTypes=[instance_type],
                        Filters=[
                            {"Name": "network-info.efa-supported",
                             "Values": ["true"]}
                        ]
                    )

                    if len(check_efa_support["InstanceTypes"]) == 0:
                        error = return_message('You have requested EFA support but your instance  (' + instance_type + ') does not support EFA')
    except ClientError as e:
        if e.response['Error'].get('Code') == 'InvalidInstanceType':
            error = return_message('InvalidInstanceType: ' + kwargs['instance_type'])
        else:
            error = return_message('Unable to check EFA support for instance: ' + kwargs['instance_type'])

    # Validate Keep EBS
    if kwargs['keep_ebs'] not in [True, False]:
        kwargs['keep_ebs'] = False

    if error is not False:
        return error
    else:
        return kwargs