community/front-end/ofe/website/ghpcfe/cluster_manager/cloud_info.py (610 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Cloud interrogation routines""" import json import logging import time from collections import defaultdict from functools import lru_cache import archspec.cpu import google.cloud.exceptions import googleapiclient.discovery from google.cloud import storage as gcs from google.cloud.billing_v1.services import cloud_catalog from google.oauth2 import service_account logger = logging.getLogger(__name__) gcp_machine_table = defaultdict( lambda: defaultdict(lambda: "x86_64"), { # General Purpose "e2": defaultdict(lambda: "x86_64"), "n2": defaultdict(lambda: "cascadelake"), "n2d": defaultdict(lambda: "zen2"), "n1": defaultdict(lambda: "x86_64"), "c3": defaultdict(lambda: "sapphirerapids"), "c3d": defaultdict(lambda: "zen2"), "c4": defaultdict(lambda: "emeraldrapids"), # Compute Optimized "c2": defaultdict(lambda: "cascadelake"), "c2d": defaultdict( lambda: "zen2" # TODO: Should be zen3, but CentOS7 doesn't have ), # a new enough kernel to recognize as such. "t2d": defaultdict(lambda: "zen2"), # TODO: Should also be zen3 "h3": defaultdict(lambda: "sapphirerapids"), # Memory Optimized "m2": defaultdict(lambda: "icelake"), "m2": defaultdict(lambda: "cascadelake"), "m1": defaultdict( lambda: "broadwell", {"megamem": "skylake_avx512", "ultramem": "broadwell"}, ), # Accelerated "a2": defaultdict(lambda: "cascadelake"), }, ) def _get_arch_for_node_type_gcp(instance): try: family, group, _ = instance.split("-", maxsplit=2) return gcp_machine_table[family][group] except ValueError: logger.error(f"Invalid instance format: {instance}") return None except KeyError: logger.error(f"Keys not found in gcp_machine_table: {instance}") return None def _get_gcp_client(credentials, service="compute", api_version="v1"): cred_info = json.loads(credentials) creds = service_account.Credentials.from_service_account_info(cred_info) return ( cred_info["project_id"], googleapiclient.discovery.build( service, api_version, credentials=creds, cache_discovery=False ), ) def _get_vm_reservations(credentials, zone, ttl_hash=None): try: # logger.info(f"Fetching VM reservations for credentials: {credentials}, zone: {zone}") project, client = _get_gcp_client(credentials) req = client.reservations().list(project=project, zone=zone) resp = req.execute() if "items" not in resp: # logger.info("No reservations found") return {} data = { reservation["name"]: { "name": reservation["name"], "specificReservationRequired": reservation.get("specificReservationRequired", False), "status": reservation["status"], "instanceProperties": { "machineType": reservation .get("specificReservation", {}) .get("instanceProperties", {}) .get("machineType", ""), "minCpuPlatform": reservation .get("specificReservation", {}) .get("instanceProperties", {}) .get("minCpuPlatform", ""), "availableCount": int( reservation .get("specificReservation", {}) .get("count", 0) ) }, "shareSettings": reservation.get("shareSettings", {}), } for reservation in resp["items"] } # logger.info(f"Reservations data: {data}") return data except Exception as e: logger.error(f"Error fetching VM reservations: {e}") return {} def get_vm_reservations(cloud_provider, credentials, unused_region, zone): if cloud_provider == "GCP": return _get_vm_reservations(credentials, zone, ttl_hash=_get_ttl_hash()) else: raise Exception(f'Unsupported Cloud Provider "{cloud_provider}"') @lru_cache def _get_gcp_disk_types( credentials, zone, ttl_hash=None ): # pylint: disable=unused-argument (project, client) = _get_gcp_client(credentials) req = client.diskTypes().list(project=project, zone=zone) resp = req.execute() return [ { "description": x["description"], "name": x["name"], "minSizeGB": int(x["validDiskSize"].split("-")[0][:-2]), "maxSizeGB": int(x["validDiskSize"].split("-")[1][:-2]), } for x in resp.get("items", []) ] def get_disk_types(cloud_provider, credentials, unused_region, zone): if cloud_provider == "GCP": return _get_gcp_disk_types( credentials, zone, ttl_hash=_get_ttl_hash() ) else: raise Exception(f'Unsupport Cloud Provider "{cloud_provider}"') @lru_cache def _get_gcp_machine_types( credentials, zone, ttl_hash=None ): # pylint: disable=unused-argument (project, client) = _get_gcp_client(credentials) # Fetch disk types dynamically disk_types = _get_gcp_disk_types(credentials, zone, ttl_hash=ttl_hash) disk_type_names = [disk_type["name"] for disk_type in disk_types] req = client.machineTypes().list( project=project, zone=zone, filter="isSharedCpu=False" ) resp = req.execute() if "items" not in resp: return [] invalid_disk_types = { "c4-": [ "local-ssd", "pd-standard", "pd-balanced", "pd-ssd", "pd-extreme", "hyperdisk-ml", "hyperdisk-throughput" ], "c3-": [ "pd-extreme", "pd-standard" ], "c3d-": [ "pd-standard", "pd-extreme", "hyperdisk-extreme" ], "n4-": [ "local-ssd", "pd-standard", "pd-balanced", "pd-ssd", "pd-extreme", "hyperdisk-extreme", "hyperdisk-ml", "hyperdisk-throughput" ], "n2-": [ "hyperdisk-balanced", "hyperdisk-ml" ], "n2d-": [ "pd-extreme", "hyperdisk-ml", "hyperdisk-balanced", "hyperdisk-extreme" ], "n1-": [ "pd-extreme", "hyperdisk-extreme", "hyperdisk-ml", "hyperdisk-throughput", "hyperdisk-balanced" ], "t2d-": [ "pd-extreme", "local-ssd", "hyperdisk-balanced", "hyperdisk-ml", "hyperdisk-extreme" ], "t2a-": [ "local-ssd", "pd-extreme", "hyperdisk-balanced", "hyperdisk-ml", "hyperdisk-extreme", "hyperdisk-throughput" ], "e2-": [ "local-ssd", "pd-extreme", "hyperdisk-balanced", "hyperdisk-ml", "hyperdisk-extreme", "hyperdisk-throughput" ], "z3-": [ "pd-extreme", "pd-standard", "hyperdisk-balanced", "hyperdisk-ml" ], "h3-": [ "local-ssd", "pd-standard", "pd-ssd", "pd-extreme", "hyperdisk-ml", "hyperdisk-extreme" ], "c2-": [ "pd-extreme", "hyperdisk-balanced", "hyperdisk-extreme", "hyperdisk-ml", "hyperdisk-throughput" ], "c2d-": [ "pd-extreme", "hyperdisk-balanced", "hyperdisk-extreme", "hyperdisk-ml", "hyperdisk-throughput" ], "x4-": [ "local-ssd", "pd-ssd", "pd-standard", "pd-balanced", "pd-extreme", "hyperdisk-ml", "hyperdisk-throughput" ], "m3-": [ "hyperdisk-throughput", "hyperdisk-ml", "pd-standard" ], "m2-": [ "local-ssd", "hyperdisk-ml", "hyperdisk-throughput" ], "m1-": [ "hyperdisk-ml", "hyperdisk-throughput" ], "n1-": [ "pd-extreme", "hyperdisk-balanced", "hyperdisk-ml", "hyperdisk-extreme", "hyperdisk-throughput" ], "a3-": [ "pd-extreme", "pd-standard", "hyperdisk-balanced" ], "a2-": [ "pd-extreme", "hyperdisk-throughput", "hyperdisk-balanced", "hyperdisk-extreme" ], "g2-": [ "pd-extreme", "pd-standard", "hyperdisk-balanced", "hyperdisk-extreme" ] } def get_invalid_disk_types(machine_type_name): family = machine_type_name.split("-")[0] + "-" return invalid_disk_types.get(family, []) data = { mt["name"]: { "name": mt["name"], "family": mt["name"].split("-")[0], "memory": mt["memoryMb"], "vCPU": mt["guestCpus"], "arch": _get_arch_for_node_type_gcp(mt["name"]), "accelerators": { acc["guestAcceleratorType"]: { "min_count": acc["guestAcceleratorCount"], "max_count": acc["guestAcceleratorCount"], } for acc in mt.get("accelerators", []) }, "invalid_disk_types": get_invalid_disk_types(mt["name"]) } for mt in resp["items"] } # Grab the Accelerators accels = ( client.acceleratorTypes() .list(project=project, zone=zone) .execute() ) # Set N1-associated Accelerators n1_accels = { acc["name"]: { "description": acc["description"], "min_count": 0, "max_count": acc["maximumCardsPerInstance"], } for acc in accels.get("items", []) if "nvidia-tesla-a100" not in acc["name"] } for mach in data.keys(): if data[mach]["family"] == "n1": data[mach]["accelerators"] = n1_accels # Fix up description for A100 (or others) elif data[mach]["accelerators"]: for acc_name in data[mach]["accelerators"].keys(): items = [ x for x in accels.get("items", []) if x["name"] == acc_name ] if items: data[mach]["accelerators"][acc_name]["description"] = ( items[0]["description"] ) # logger.info(data) return data def _get_ttl_hash(seconds=3600 * 24): """Return the same value within `seconds` time period. Default to 1 day of caching """ return round(time.time() / seconds) def get_machine_types(cloud_provider, credentials, unused_region, zone): if cloud_provider == "GCP": return _get_gcp_machine_types( credentials, zone, ttl_hash=_get_ttl_hash() ) else: raise Exception(f'Unsupport Cloud Provider "{cloud_provider}"') def _get_arch_ancestry(arch): ancestry = {arch.name} for p in arch.parents: ancestry.update(_get_arch_ancestry(p)) return ancestry def get_common_arch(archs): archs = [archspec.cpu.TARGETS[a] for a in archs] common_arch_set = set.intersection(*[_get_arch_ancestry(a) for a in archs]) if not common_arch_set: return None return max([archspec.cpu.TARGETS[a] for a in common_arch_set]).name def get_arch_ancestry(arch_name): arch = archspec.cpu.TARGETS[arch_name] res = [arch_name] if arch.family != arch: for x in arch.parents: res.extend(get_arch_ancestry(x.name)) return res def get_arch_family(arch): return archspec.cpu.TARGETS[arch].family.name def sort_architectures(arch_names): archs = [archspec.cpu.TARGETS[a] for a in arch_names] return [x.name for x in sorted(archs)] @lru_cache def _get_gcp_region_zone_info( credentials, ttl_hash=None ): # pylint: disable=unused-argument (project, client) = _get_gcp_client(credentials) req = client.zones().list(project=project) results = defaultdict(list) while req is not None: resp = req.execute() for zone in resp["items"]: region = "-".join(zone["name"].split("-")[:-1]) results[region].append(zone["name"]) req = client.zones().list_next( previous_request=req, previous_response=resp ) return results def get_region_zone_info(cloud_provider, credentials): if cloud_provider == "GCP": return _get_gcp_region_zone_info(credentials, ttl_hash=_get_ttl_hash()) else: raise Exception("Unsupported Cloud Provider") def _get_gcp_subnets(credentials): (project, client) = _get_gcp_client(credentials) req = client.subnetworks().listUsable(project=project) results = req.execute() entries = results["items"] subnets = [] for entry in entries: # subnet in the form of https://www.googleapis.com/compute/v1/projects/<project>/regions/<region>/subnetworks/<name> tokens = entry["subnetwork"].split("/") region = tokens[8] subnet = tokens[10] # vpc in the form of https://www.googleapis.com/compute/v1/projects/<project>/global/networks/<name> tokens = entry["network"].split("/") vpc = tokens[9] # cidr in standard form xxx.xxx.xxx.xxx/yy cidr = entry["ipCidrRange"] subnets.append([vpc, region, subnet, cidr]) return subnets def get_subnets(cloud_provider, credentials): if cloud_provider == "GCP": return _get_gcp_subnets(credentials) else: raise Exception("Unsupported Cloud Provider") _gcp_services_list = None _gcp_compute_sku_list = None def _get_gcp_instance_pricing( credentials, region, zone, instance_type, gpu_info=None ): global _gcp_services_list global _gcp_compute_sku_list creds = service_account.Credentials.from_service_account_info( json.loads(credentials) ) catalog = cloud_catalog.CloudCatalogClient(credentials=creds) # Step one: Find the Compute Engine service if not _gcp_services_list: _gcp_services_list = [ x for x in catalog.list_services() if "Compute Engine" == x.display_name ] services = _gcp_services_list if len(services) != 1: raise Exception("Did not find Compute Engine Service") # Step two: Get all the SKUs associated with the Compute Engine service if not _gcp_compute_sku_list: _gcp_compute_sku_list = list(catalog.list_skus(parent=services[0].name)) skus = [x for x in _gcp_compute_sku_list if region in x.service_regions] # To zero'th degree, pricing for an instance is made up of: # # cores * Price/PerCore of instance semi-family # # GB RAM * Price/GBhr of instance semi-family # <OTHER THINGS - local SSD, GPUs, Tier 1 networking> THESE ARE TODO # # Disk Storage - Just assume a 20GB disk - that's what we currently get # Google's Billing API has SKUs, but the SKUs don't map to anything - you # can't get SKU info from the actual products. We have to look up sku's # with pricing info, and try to map the SKU's description to the actual # Compute infrastructure we're using. We do have to look at the # "description" field, which feels hazardous and liable to change def price_expr_to_unit_price(expr): """Convert a "Price Expression" to a unit (hourly) price""" unit = expr.tiered_rates[0].unit_price return unit.units + (unit.nanos * 1e-9) def get_disk_price(disk_size, skus): def disk_sku_filter(elem): if elem.category.resource_family != "Storage": return False if elem.category.resource_group != "PDStandard": return False if region not in elem.service_regions: return False if not elem.description.startswith("Storage PD Capacity"): # Filter out 'Regional Storage PD Capacity...' return False return True disk_sku = [x for x in skus if disk_sku_filter(x)] if len(disk_sku) != 1: raise Exception("Failed to find singular appropriate disk") disk_price_expression = disk_sku[0].pricing_info[0].pricing_expression unit_price = price_expr_to_unit_price(disk_price_expression) disk_cost_per_month = disk_size * unit_price disk_cost_per_hr = disk_cost_per_month / (24 * 30) return disk_cost_per_hr def get_cpu_price(num_cores, instance_type, skus): instance_description_mapper = { "e2": "E2 Instance Core", "n2d": "N2D AMD Instance Core", "h3": "Compute optimized Core", "c3": "Compute optimized Core", "c4": "Compute optimized Core", "c2": "Compute optimized Core", "c2d": "C2D AMD Instance Core", "c3d": "C3D AMD Instance Core", "t2d": "T2D AMD Instance Core", "a2": "A2 Instance Core", "m1": "Memory-optimized Instance Core", # ?? "m2": "Memory Optimized Upgrade Premium for Memory-optimized Instance Core", # pylint: disable=line-too-long "m3": "Memory-optimized Instance Core", "n2": "N2 Instance Core", "n1": "Custom Instance Core", # ?? } instance_class = instance_type.split("-")[0] if instance_class not in instance_description_mapper: raise NotImplementedError( "Do not yet have a price mapping for instance type " f"{instance_type}" ) def cpu_sku_filter(elem): if elem.category.resource_family != "Compute": return False if elem.category.resource_group != "CPU": return False if elem.category.usage_type != "OnDemand": return False if region not in elem.service_regions: return False if "Sole Tenancy" in elem.description: return False if not elem.description.startswith( instance_description_mapper[instance_class] ): return False return True cpu_sku = [x for x in skus if cpu_sku_filter(x)] if len(cpu_sku) != 1: raise Exception("Failed to find singular appropriate cpu billing") cpu_price_expression = cpu_sku[0].pricing_info[0].pricing_expression unit_price = price_expr_to_unit_price(cpu_price_expression) cpu_price_per_hr = num_cores * unit_price return cpu_price_per_hr def get_mem_price(num_gb, instance_type, skus): instance_description_mapper = { "e2": "E2 Instance Ram", "n2d": "N2D AMD Instance Ram", "c2": "Compute optimized Ram", "c3": "Compute optimized Ram", "h3": "Compute optimized Ram", "c2d": "C2D AMD Instance Ram", "c3d": "C3D AMD Instance Ram", "c4": "C4 Instance RAM", "t2d": "T2D AMD Instance Ram", "a2": "A2 Instance Ram", "m1": "Memory-optimized Instance Ram", "m2": "Memory-optimized Instance Ram", "m3": "Memory-optimized Instance Ram", # ?? "n2": "N2 Instance Ram", "n1": "Custom Instance Ram", # ?? } # TODO: Deal with 'Extended Instance Ram' instance_class = instance_type.split("-")[0] if instance_class not in instance_description_mapper: raise NotImplementedError( "Do not yet have a price mapping for instance type " f"{instance_type}" ) def mem_sku_filter(elem): if elem.category.resource_family != "Compute": return False if elem.category.resource_group != "RAM": return False if elem.category.usage_type != "OnDemand": return False if region not in elem.service_regions: return False if "Sole Tenancy" in elem.description: return False if not elem.description.startswith( instance_description_mapper[instance_class] ): return False return True mem_sku = [x for x in skus if mem_sku_filter(x)] if len(mem_sku) != 1: raise Exception("Failed to find singular appropriate RAM billing") mem_price_expression = mem_sku[0].pricing_info[0].pricing_expression unit_price = price_expr_to_unit_price(mem_price_expression) ram_price_per_hr = num_gb * unit_price return ram_price_per_hr def get_accel_price(gpu_description, gpu_count, skus): def gpu_sku_filter(elem): if elem.category.resource_family != "Compute": return False if elem.category.resource_group != "GPU": return False if elem.category.usage_type != "OnDemand": return False return elem.description.lower().startswith( gpu_description.lower()) gpu_sku = [x for x in skus if gpu_sku_filter(x)] if len(gpu_sku) != 1: raise Exception("Failed to find singular appropriate GPU billing") gpu_price_expression = gpu_sku[0].pricing_info[0].pricing_expression unit_price = price_expr_to_unit_price(gpu_price_expression) gpu_price_per_hr = gpu_count * unit_price return gpu_price_per_hr machine = _get_gcp_machine_types(credentials, zone)[instance_type] instance_price = ( get_cpu_price(machine["vCPU"], instance_type, skus) + get_mem_price(machine["memory"] / 1024, instance_type, skus) # TODO: Actual disk size (20 is GHPC default) + get_disk_price(20.0, skus) ) if gpu_info: (gpu_name, gpu_count) = gpu_info if gpu_count: # Need to map GPU name to GPU description for Pricing API try: gpu_desc = machine["accelerators"][gpu_name]["description"] instance_price += get_accel_price(gpu_desc, gpu_count, skus) except KeyError as err: raise Exception( "Failed to map accelerator to instance" ) from err return instance_price def get_instance_pricing( cloud_provider, credentials, region, zone, instance_type, gpu_info=None ): """Return price per hour for an instance""" if cloud_provider == "GCP": return _get_gcp_instance_pricing( credentials, region, zone, instance_type, gpu_info ) else: raise Exception(f'Unsupported Cloud Provider "{cloud_provider}"') def gcs_apply_bucket_acl( bucket, account, permission="roles/storage.objectViewer" ): logger.info( "Attempting to grant %s to gs://%s/ for user %s", permission, bucket, account, ) client = gcs.Client() try: gcs_bucket = client.get_bucket(bucket) policy = gcs_bucket.get_iam_policy() for binding in policy.bindings: if binding["role"] == permission: binding["members"].add(account) break else: policy.bindings.append( {"role": permission, "members": set(account)} ) gcs_bucket.set_iam_policy(policy) # Myriad errors could occur, none of them handleable so just log and move on except Exception as err: # pylint: disable=broad-except logger.error("Failed to apply GCS Policy", exc_info=err) def gcs_upload_file(bucket, path, contents, extra_acl=None): extra_acl = extra_acl if extra_acl else [] logger.info( "Attempting to upload to gs://%s/%s", bucket, path if path else "" ) client = gcs.Client() gcs_bucket = client.bucket(bucket) blob = gcs_bucket.blob(path) blob.upload_from_string(contents) for acl in extra_acl: user = acl.get("user", None) permission = acl.get("permission", None) if user and permission: if permission in ["OWNER", "READER", "WRITER"]: blob.acl.user(user).grant(permission) blob.acl.save() client.close() def gcs_fetch_file(bucket, paths): client = gcs.Client() gcs_bucket = client.bucket(bucket) results = {} for path in paths: try: logger.debug( "Attempting to download from gs://%s/%s", bucket, path if path else "", ) blob = gcs_bucket.blob(path) results[path] = blob.download_as_text() except google.cloud.exceptions.NotFound as nf: logger.info( "Attempt failed (Not Found) to download {path}", exc_info=nf ) client.close() return results def gcs_get_blob(bucket, path): """Returns a blob object - it may or may not exist""" client = gcs.Client() gcs_bucket = client.bucket(bucket) return gcs_bucket.blob(path) def get_gcp_workbench_region_zone_info( credentials, service="notebooks", api_version="v1" ): (project, nb) = _get_gcp_client(credentials, service, api_version) request = nb.projects().locations().list(name=f"projects/{project}") result = request.execute() locations = [x["locationId"] for x in result["locations"]] return locations def get_gcp_filestores(credentials): """Returns an array of Filestore instance information E.g. [ {'createTime': ..., 'fileShares': [{'capacityGb': '2660', 'name': 'data'}], 'name': 'projects/<project>/locations/<zone>/instances/<name>', 'networks': [ {'ipAddresses': ['10.241.201.242'], 'modes': ['MODE_IPV4'], 'network': '<network-name>', 'reservedIpRange': '10.241.201.240/29' } ], 'state': 'READY', 'tier': 'PREMIUM' }, ... ] """ (project, client) = _get_gcp_client(credentials, "file", "v1") request = ( client.projects() .locations() .instances() .list(parent=f"projects/{project}/locations/-") ) result = request.execute() return result["instances"]