in source/soca/cluster_manager/add_nodes.py [0:0]
def check_config(**kwargs):
error = False
# Convert str to bool when possible
for k, v in kwargs.items():
if str(v).lower() in ['true', 'yes', 'y', 'on']:
kwargs[k] = True
if str(v).lower() in ['false', 'no', 'n', 'off']:
kwargs[k] = False
# Transform instance_type as list in case multiple type are specified
kwargs['instance_type'] = kwargs['instance_type'].split("+")
# Transform weighted_capacity as a list in case multiple instance types are specified
# Confirm that the length of weighted_capacity list is consistent with the length of instance_type list
weighted_capacity=[]
if kwargs['weighted_capacity'] is not False:
for item in kwargs['weighted_capacity'].split("+"):
try:
item=int(item)
except ValueError:
error = return_message('All values specified for --weighted_capacity must be integers. Found: ' + str(item))
weighted_capacity.append(item)
kwargs['weighted_capacity'] = weighted_capacity
if len(kwargs['instance_type']) != len(kwargs['weighted_capacity']):
error = return_message('Number of --weighted_capacity entries is not consistent with --instance_type entries')
# Validate terminate_when_idle
if 'terminate_when_idle' not in kwargs.keys():
kwargs['terminate_when_idle'] = 0
# Must convert true,True into bool() and false/False
if kwargs['job_id'] is None and kwargs['keep_forever'] is False and int(kwargs['terminate_when_idle']) == 0:
error = return_message('--job_id, --keep_forever True, or --terminate_when_idle N>0 must be specified')
# Ensure jobId is not None when using keep_forever
if kwargs['job_id'] is None and (kwargs['keep_forever'] is True or int(kwargs['terminate_when_idle']) > 0):
kwargs['job_id'] = kwargs['stack_uuid']
# Ensure anonymous metric is either True or False.
if kwargs['anonymous_metrics'] not in [True, False]:
kwargs['anonymous_metrics'] = True
# Ensure force_ri is either True or False.
if kwargs['force_ri'] not in [True, False]:
kwargs['force_ri'] = False
if kwargs['force_ri'] is True and kwargs['spot_price'] is False:
# Job can only run on Reserved Instance. We ignore if SpotFleet is enabled
try:
instance_type_info
except NameError:
instance_type_info = {}
for instance_type in kwargs["instance_type"]:
check_ri = verify_ri_saving_availabilities(instance_type, instance_type_info)
if (check_ri[instance_type]["current_instance_in_use"] + int(kwargs['desired_capacity'])) > check_ri[instance_type]["current_ri_purchased"]:
error = return_message("Not enough RI to cover for this job. Instance type: {}, number of running instances: {}, number of purchased RIs: {}, capacity requested: {}. Either purchase more RI or allow usage of On Demand".format(
instance_type,
check_ri[instance_type]["current_instance_in_use"],
check_ri[instance_type]["current_ri_purchased"],
kwargs['desired_capacity']))
else:
# Update the number of current_instance_in_use with the number of new instance that this job will launch
instance_type_info[instance_type] = {'current_instance_in_use': check_ri[instance_type]["current_instance_in_use"] + int(kwargs['desired_capacity'])}
# Default System metrics to False unless explicitly set to True
if kwargs['system_metrics'] is not True:
kwargs['system_metrics'] = False
if not isinstance(int(kwargs['desired_capacity']), int):
return_message('Desired Capacity must be an int')
if not 'tags' in kwargs.keys() or kwargs['tags'] is None:
kwargs['tags'] = {}
else:
try:
kwargs['tags'] = ast.literal_eval(kwargs['tags'])
if not isinstance(kwargs['tags'], dict):
error = return_message('Tags must be a valid dictionary')
except ValueError:
error = return_message('Tags must be a valid dictionary')
# FSx Management
kwargs['fsx_lustre_configuration'] = {
'fsx_lustre': kwargs['fsx_lustre'],
's3_backend': False,
'existing_fsx': False,
'import_path': False,
'export_path': False,
'deployment_type': kwargs['fsx_lustre_deployment_type'],
'per_unit_throughput': False,
'capacity': 1200
}
if kwargs['fsx_lustre'] is not False:
fsx_deployment_type_allowed = ["scratch_1", "scratch_2", "persistent_1"]
fsx_lustre_per_unit_throughput_allowed = [50, 100, 200]
# Default to SCRATCH_2 if incorrect value is specified
if kwargs["fsx_lustre_deployment_type"].lower() not in fsx_deployment_type_allowed:
return_message('FSx Deployment Type must be: ' + ",".join(fsx_deployment_type_allowed))
else:
kwargs["fsx_lustre_configuration"]["fsx_lustre_deployment_type"] = kwargs["fsx_lustre_deployment_type"].upper()
# If deployment_type is PERSISTENT, configure Per unit throughput and default to 200mb/s
if kwargs["fsx_lustre_configuration"]["fsx_lustre_deployment_type"].lower() == "persistent_1":
if not isinstance(int(kwargs['fsx_lustre_per_unit_throughput']), int):
return_message('FSx Per Unit Throughput must be an int')
else:
if kwargs["fsx_lustre_per_unit_throughput"] not in fsx_lustre_per_unit_throughput_allowed:
return_message('FSx Deployment Type must be: ' + ",".join(fsx_lustre_per_unit_throughput_allowed))
else:
kwargs["fsx_lustre_configuration"]["per_unit_throughput"] = int(kwargs["fsx_lustre_per_unit_throughput"])
if kwargs['fsx_lustre'] is not True:
# when fsx_lustre is set to True, only create a FSx without S3 backend
if kwargs['fsx_lustre'].startswith("fs-"):
kwargs['fsx_lustre_configuration']['existing_fsx'] = kwargs['fsx_lustre']
else:
if kwargs['fsx_lustre'].startswith("s3://"):
kwargs['fsx_lustre_configuration']['s3_backend'] = kwargs['fsx_lustre']
else:
kwargs['fsx_lustre_configuration']['s3_backend'] = "s3://" + kwargs['fsx_lustre']
# Verify if SOCA has permissions to access S3 backend
try:
s3.get_bucket_acl(Bucket=kwargs['fsx_lustre'].split('s3://')[-1])
except exceptions.ClientError:
error = return_message('SOCA does not have access to this bucket (' + kwargs['fsx_lustre'] + '). Refer to the documentation to update IAM policy.')
# Verify if user specified custom Import/Export path.
# Syntax is fsx_lustre=<bucket>+<export_path>+<import_path>
check_user_specified_path = kwargs['fsx_lustre'].split('+')
if check_user_specified_path.__len__() == 1:
pass
elif check_user_specified_path.__len__() == 2:
# import path default to bucket root if not specified
kwargs['fsx_lustre_configuration']['export_path'] = check_user_specified_path[1]
kwargs['fsx_lustre_configuration']['import_path'] = kwargs['fsx_lustre_configuration']['s3_backend']
elif check_user_specified_path.__len__() == 3:
# When customers specified both import and export path
kwargs['fsx_lustre_configuration']['export_path'] = check_user_specified_path[1]
kwargs['fsx_lustre_configuration']['import_path'] = check_user_specified_path[2]
else:
error = return_message('Error setting up Import/Export path: ' + kwargs['fsx_lustre'] + '). Syntax is <bucket_name>+<export_path>+<import_path>. If import_path is not specified it defaults to bucket root level')
if kwargs['fsx_lustre_size'] is not False:
fsx_lustre_capacity_allowed = [1200, 2400, 3600, 7200, 10800]
if int(kwargs['fsx_lustre_size']) not in fsx_lustre_capacity_allowed:
error = return_message('fsx_lustre_size must be: ' + ','.join(str(x) for x in fsx_lustre_capacity_allowed))
else:
kwargs['fsx_lustre_configuration']['capacity'] = kwargs['fsx_lustre_size']
SpotFleet = True if (kwargs['spot_price'] is not False and kwargs['spot_allocation_count'] is False and (int(kwargs['desired_capacity']) > 1 or kwargs['instance_type'].__len__() > 1)) else False
if kwargs['subnet_id'] is False:
if SpotFleet is True:
kwargs['subnet_id'] = soca_configuration["PrivateSubnets"]
else:
kwargs['subnet_id'] = [random.choice(soca_configuration["PrivateSubnets"])]
else:
if isinstance(kwargs['subnet_id'], int):
if kwargs['subnet_id'] == 2:
kwargs['subnet_id'] = random.sample(soca_configuration["PrivateSubnets"], 2)
elif kwargs['subnet_id'] == 3:
kwargs['subnet_id'] = random.sample(soca_configuration["PrivateSubnets"], 3)
else:
error = return_message('Approved value for subnet_id are either the actual subnet ID or 2 or 3')
else:
kwargs['subnet_id'] = kwargs['subnet_id'].split('+')
for subnet in kwargs['subnet_id']:
if subnet not in soca_configuration["PrivateSubnets"]:
error = return_message('Incorrect subnet_id. Must be one of ' + ','.join(soca_configuration["PrivateSubnets"]))
# Handle placement group logic
if 'placement_group' not in kwargs.keys():
pg_user_defined = False
# Default PG to True if not present
kwargs['placement_group'] = True if SpotFleet is False else False
else:
pg_user_defined = True
if kwargs['placement_group'] not in [True, False]:
kwargs['placement_group'] = False
error = return_message('Incorrect placement_group. Must be True or False')
if int(kwargs['desired_capacity']) > 1:
if kwargs['subnet_id'].__len__() > 1 and pg_user_defined is True and kwargs['placement_group'] is True and SpotFleet is False:
# more than 1 subnet specified but placement group is also configured, default to the first subnet and enable PG
kwargs['subnet_id'] = [kwargs['subnet_id'][0]]
else:
if kwargs['subnet_id'].__len__() > 1 and pg_user_defined is False:
kwargs['placement_group'] = False
else:
if int(kwargs['desired_capacity']) == 1:
kwargs['placement_group'] = False
else:
# default to user specified value
pass
if kwargs['subnet_id'].__len__() > 1:
if kwargs['placement_group'] is True and SpotFleet is False:
# if placement group is True and more than 1 subnet is defined, force default to 1 subnet
kwargs['subnet_id'] = [kwargs['subnet_id'][0]]
# Validate additional security group ids
if kwargs['security_groups']:
sgs_id = kwargs['security_groups'].split("+")
if sgs_id.__len__() > 4:
error = return_message("You can only specify a maximum of 4 additional security groups")
try:
ec2.describe_security_groups(GroupIds=sgs_id)['SecurityGroups']
kwargs['security_groups'] = sgs_id
except Exception as err:
error = return_message(f'Unable to validate one SG from {sgs_id} due to {err}')
# Validate custom IAM Instance Profile
if kwargs['instance_profile']:
try:
kwargs['instance_profile'] = iam.get_instance_profile(InstanceProfileName=kwargs['instance_profile'])["InstanceProfile"]["Arn"]
except Exception as err:
error = return_message(f"Unable to validate custom IAM instance profile {kwargs['instance_profile']} due to {err}")
# Check core_count and ht_support
try:
instance_attributes = ec2.describe_instance_types(InstanceTypes=[kwargs['instance_type'][0]])
if len(instance_attributes['InstanceTypes']) == 0:
error = return_message('Unable to check instance: ' + kwargs['instance_type'][0])
else:
# boto3 does not return Default Cores/ThreadsPerCore T2 instances does not have DefaultCores
if kwargs['instance_type'][0] in ["t2.micro", "t2.nano", "t2.small"]:
kwargs['ht_support'] = False
elif kwargs['instance_type'][0] in ["t2.medium", "t2.large", "t2.xlarge", "t2.2xlarge"]:
# do not set ht_support. Will default to None if not explicitly set by the user
pass
else:
kwargs['core_count'] = instance_attributes['InstanceTypes'][0]['VCpuInfo']['DefaultCores']
if instance_attributes['InstanceTypes'][0]['VCpuInfo']['DefaultThreadsPerCore'] == 1:
# Set ht_support to False for instances with DefaultThreadsPerCore = 1 (e.g. graviton)
kwargs['ht_support'] = False
except ClientError as e:
if e.response['Error'].get('Code') == 'InvalidInstanceType':
error = return_message('InvalidInstanceType: ' + kwargs['instance_type'][0])
else:
error = return_message('Unable to check instance: ' + kwargs['instance_type'][0])
# Validate Spot Allocation Strategy
mapping = {
"lowest-price":
{
"ASG": "lowest-price",
"SpotFleet": "lowestPrice",
"accepted_values": ["lowest-price", "lowestprice"]
},
"diversified":
{
"ASG": "capacity-optimized",
"SpotFleet": "diversified",
"accepted_values": ["diversified"]
},
"capacity-optimized":
{
"ASG": "capacity-optimized",
"SpotFleet": "capacityOptimized",
"accepted_values": ["capacityoptimized", "capacity-optimized", "optimized"]
}
}
if kwargs['spot_allocation_strategy'] is not False:
for k, v in mapping.items():
if kwargs['spot_allocation_strategy'].lower() in v["accepted_values"]:
if SpotFleet is True:
kwargs['spot_allocation_strategy'] = v["SpotFleet"]
break
else:
kwargs['spot_allocation_strategy'] = v["ASG"]
break
spot_allocation_strategy_allowed = ['lowestPrice', 'lowest-price', 'diversified', 'capacityOptimized', 'capacity-optimized']
if kwargs['spot_allocation_strategy'] not in spot_allocation_strategy_allowed:
error = return_message('spot_allocation_strategy_allowed (' + str(kwargs['spot_allocation_strategy']) + ') must be one of the following value: ' + ', '.join(spot_allocation_strategy_allowed))
else:
kwargs['spot_allocation_strategy'] = 'capacityOptimized' if SpotFleet is True else 'capacity-optimized'
# Validate Spot Allocation Percentage
if kwargs['spot_allocation_count'] is not False:
if isinstance(kwargs['spot_allocation_count'], int):
if int(kwargs['spot_allocation_count']) > kwargs['desired_capacity']:
error = return_message('spot_allocation_count (' + str(kwargs['spot_allocation_count']) + ') must be an lower or equal to the number of nodes provisioned for this simulation (' + str(kwargs['desired_capacity']) + ')')
else:
error = return_message('spot_allocation_count (' + str(kwargs['spot_allocation_count']) + ') must be an integer')
# Validate ht_support
if kwargs['ht_support'] is None:
kwargs['ht_support'] = False
else:
if kwargs['ht_support'] not in [True, False]:
error = return_message('ht_support (' + str(kwargs['ht_support']) + ') must be either True or False')
# Validate Base OS
if kwargs['base_os'] is not False:
base_os_allowed = ['rhel7', 'centos7', 'amazonlinux2']
if kwargs['base_os'] not in base_os_allowed:
error = return_message('base_os (' + str(kwargs['base_os']) + ') must be one of the following value: ' + ','.join(base_os_allowed))
else:
kwargs['base_os'] = soca_configuration['BaseOS']
# Validate Spot Price
if kwargs['spot_price'] is not False:
if kwargs['spot_price'] == 'auto' or isinstance(kwargs['spot_price'], float):
pass
else:
error = return_message('spot_price must be either "auto" or a float value"')
# Validate EFA
try:
if kwargs['efa_support'] not in [True, False]:
kwargs['efa_support'] = False
else:
if kwargs['efa_support'] is True:
for instance_type in kwargs['instance_type']:
check_efa_support = ec2.describe_instance_types(
InstanceTypes=[instance_type],
Filters=[
{"Name": "network-info.efa-supported",
"Values": ["true"]}
]
)
if len(check_efa_support["InstanceTypes"]) == 0:
error = return_message('You have requested EFA support but your instance (' + instance_type + ') does not support EFA')
except ClientError as e:
if e.response['Error'].get('Code') == 'InvalidInstanceType':
error = return_message('InvalidInstanceType: ' + kwargs['instance_type'])
else:
error = return_message('Unable to check EFA support for instance: ' + kwargs['instance_type'])
# Validate Keep EBS
if kwargs['keep_ebs'] not in [True, False]:
kwargs['keep_ebs'] = False
if error is not False:
return error
else:
return kwargs