in api/PclusterApiHandler.py [0:0]
def get_aws_config():
if request.args.get("region"):
config = botocore.config.Config(region_name=request.args.get("region"))
ec2 = boto3.client("ec2", config=config)
fsx = boto3.client("fsx", config=config)
efs = boto3.client("efs", config=config)
else:
ec2 = boto3.client("ec2")
fsx = boto3.client("fsx")
efs = boto3.client("efs")
keypairs = ec2.describe_key_pairs()["KeyPairs"]
vpcs = ec2.describe_vpcs()["Vpcs"]
subnets = ec2.describe_subnets()["Subnets"]
security_groups = ec2.describe_security_groups()["SecurityGroups"]
security_groups = [{k: sg[k] for k in {"GroupId", "GroupName"}} for sg in security_groups]
efa_filters = [{"Name": "network-info.efa-supported", "Values": ["true"]}]
instance_paginator = ec2.get_paginator("describe_instance_types")
efa_instances_paginator = instance_paginator.paginate(Filters=efa_filters)
efa_instance_types = []
for efa_instances in efa_instances_paginator:
efa_instance_types += [e["InstanceType"] for e in efa_instances["InstanceTypes"]]
fsx_filesystems = []
try:
fsx_filesystems = fsx.describe_file_systems()["FileSystems"]
except:
pass
fsx_volumes = []
try:
fsx_volumes = list(filter(lambda vol: (vol["Lifecycle"] == "CREATED" or vol["Lifecycle"] == "AVAILABLE"),
fsx.describe_volumes()["Volumes"]))
except:
pass
file_caches = []
try:
file_caches = list(filter(lambda file_cache: (file_cache["Lifecycle"] == "AVAILABLE"),
fsx.describe_file_caches()["FileCaches"]))
except:
pass
efs_filesystems = []
try:
efs_filesystems = efs.describe_file_systems()["FileSystems"]
except:
pass
region = ""
try:
region = boto3.Session().region_name
except:
pass
return {
"security_groups": security_groups,
"keypairs": keypairs,
"vpcs": vpcs,
"subnets": subnets,
"region": region,
"fsx_filesystems": fsx_filesystems,
"fsx_volumes": fsx_volumes,
"file_caches": file_caches,
"efs_filesystems": efs_filesystems,
"efa_instance_types": efa_instance_types,
}