community/modules/scripts/htcondor-install/files/autoscaler.py (292 lines of code) (raw):

#!/usr/bin/python3 # -*- coding: utf-8 -*- # Copyright 2018 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # Script for resizing managed instance group (MIG) cluster size based # on the number of jobs in the Condor Queue. from absl import app from absl import flags from collections import OrderedDict from datetime import datetime from pprint import pprint from googleapiclient import discovery from oauth2client.client import GoogleCredentials import argparse import os import math import time import htcondor import classad parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("--p", required=True, help="Project id", type=str) parser.add_argument( "--z", required=True, help="Name of GCP zone where the managed instance group is located", type=str, ) parser.add_argument( "--r", required=True, help="Name of GCP region where the managed instance group is located", type=str, ) parser.add_argument( "--mz", required=False, help="Enabled multizone (regional) managed instance group", action="store_true", ) parser.add_argument( "--g", required=True, help="Name of the managed instance group", type=str ) parser.add_argument( "--i", default=0, help="Minimum number of idle compute instances", type=int ) parser.add_argument( "--c", required=True, help="Maximum number of compute instances", type=int ) parser.add_argument( "--v", default=0, help="Increase output verbosity. 1-show basic debug info. 2-show detail debug info", type=int, choices=[0, 1, 2], ) parser.add_argument( "--d", default=0, help="Dry Run, default=0, if 1, then no scaling actions", type=int, choices=[0, 1], ) args = parser.parse_args() class AutoScaler: def __init__(self, multizone=False): self.multizone = multizone # Obtain credentials self.credentials = GoogleCredentials.get_application_default() self.service = discovery.build("compute", "v1", credentials=self.credentials) if self.multizone: self.instanceGroupManagers = self.service.regionInstanceGroupManagers() else: self.instanceGroupManagers = self.service.instanceGroupManagers() # Remove specified instances from MIG and decrease MIG size def deleteFromMig(self, node_self_links): requestDelInstance = self.instanceGroupManagers.deleteInstances( project=self.project, **self.zoneargs, instanceGroupManager=self.instance_group_manager, body={ "instances": node_self_links }, ) # execute if not a dry-run if not self.dryrun: response = requestDelInstance.execute() if self.debug > 0: pprint(response) return response return "Dry Run" def getInstanceTemplateInfo(self): requestTemplateName = self.instanceGroupManagers.get( project=self.project, **self.zoneargs, instanceGroupManager=self.instance_group_manager, fields="instanceTemplate", ) responseTemplateName = requestTemplateName.execute() template_name = "" if self.debug > 1: print("Request for the template name") pprint(responseTemplateName) if len(responseTemplateName) > 0: template_url = responseTemplateName.get("instanceTemplate") template_url_partitioned = template_url.split("/") template_name = template_url_partitioned[len(template_url_partitioned) - 1] requestInstanceTemplate = self.service.instanceTemplates().get( project=self.project, instanceTemplate=template_name, fields="properties" ) responseInstanceTemplateInfo = requestInstanceTemplate.execute() if self.debug > 1: print("Template information") pprint(responseInstanceTemplateInfo["properties"]) machine_type = responseInstanceTemplateInfo["properties"]["machineType"] is_spot = responseInstanceTemplateInfo["properties"]["scheduling"][ "preemptible" ] if self.debug > 0: print("Machine Type: " + machine_type) print("Is spot: " + str(is_spot)) request = self.service.machineTypes().get( project=self.project, zone=self.zone, machineType=machine_type ) response = request.execute() guest_cpus = response["guestCpus"] if self.debug > 1: print("Machine information") pprint(responseInstanceTemplateInfo["properties"]) if self.debug > 0: print("Guest CPUs: " + str(guest_cpus)) instanceTemplateInfo = { "machine_type": machine_type, "is_spot": is_spot, "guest_cpus": guest_cpus, } return instanceTemplateInfo def scale(self): # diagnosis if self.debug > 1: print("Launching autoscaler.py with the following arguments:") print("project_id: " + self.project) print("zone: " + self.zone) print("region: " + self.region) print(f"multizone: {self.multizone}") print("group_manager: " + self.instance_group_manager) print("computeinstancelimit: " + str(self.compute_instance_limit)) print("debuglevel: " + str(self.debug)) if self.multizone: self.zoneargs = {"region": self.region} else: self.zoneargs = {"zone": self.zone} # Each HTCondor scheduler (SchedD), maintains a list of jobs under its # stewardship. A full list of Job ClassAd attributes can be found at # https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html schedd = htcondor.Schedd() # encourage the job queue to start a new negotiation cycle; there are # internal unconfigurable rate limits so not guaranteed; this is not # strictly required for success, but may reduce latency of autoscaling schedd.reschedule() REQUEST_CPUS_ATTRIBUTE = "RequestCpus" REQUEST_GPUS_ATTRIBUTE = "RequestGpus" REQUEST_MEMORY_ATTRIBUTE = "RequestMemory" job_attributes = [ REQUEST_CPUS_ATTRIBUTE, REQUEST_GPUS_ATTRIBUTE, REQUEST_MEMORY_ATTRIBUTE, ] instanceTemplateInfo = self.getInstanceTemplateInfo() self.is_spot = instanceTemplateInfo["is_spot"] self.cores_per_node = instanceTemplateInfo["guest_cpus"] print(f"MIG is configured for Spot pricing: {self.is_spot}") print("Number of CPU per compute node: " + str(self.cores_per_node)) # this query will constrain the search for jobs to those that either # require spot VMs or do not require Spot VMs based on whether the # VM instance template is configured for Spot pricing spot_query = classad.ExprTree(f"RequireId == \"{self.instance_group_manager}\"") # For purpose of scaling a Managed Instance Group, count only jobs that # are idle and likely participated in a negotiation cycle (there does # not appear to be a single classad attribute for this). # https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#JobStatus LAST_CYCLE_ATTRIBUTE = "LastNegotiationCycleTime0" coll = htcondor.Collector() negotiator_ad = coll.query(htcondor.AdTypes.Negotiator, projection=[LAST_CYCLE_ATTRIBUTE]) if len(negotiator_ad) != 1: print(f"There should be exactly 1 negotiator in the pool. There is {len(negotiator_ad)}") exit() last_negotiation_cycle_time = negotiator_ad[0].get(LAST_CYCLE_ATTRIBUTE) if not last_negotiation_cycle_time: print(f"The negotiator has not yet started a match cycle. Exiting auto-scaling.") exit() print(f"Last negotiation cycle occurred at: {datetime.fromtimestamp(last_negotiation_cycle_time)}") idle_job_query = classad.ExprTree(f"JobStatus == 1 && QDate < {last_negotiation_cycle_time}") idle_job_ads = schedd.query(constraint=idle_job_query.and_(spot_query), projection=job_attributes) total_idle_request_cpus = sum(j[REQUEST_CPUS_ATTRIBUTE] for j in idle_job_ads) print(f"Total CPUs requested by idle jobs: {total_idle_request_cpus}") if self.debug > 1: print("Information about the compute instance template") pprint(instanceTemplateInfo) # Calculate the minimum number of instances that, for fully packed # execute points, could satisfy current job queue min_hosts_for_idle_jobs = math.ceil(total_idle_request_cpus / self.cores_per_node) if self.debug > 0: print(f"Minimum hosts needed: {total_idle_request_cpus} / {self.cores_per_node} = {min_hosts_for_idle_jobs}") # Get current number of instances in the MIG requestGroupInfo = self.instanceGroupManagers.get( project=self.project, **self.zoneargs, instanceGroupManager=self.instance_group_manager, ) responseGroupInfo = requestGroupInfo.execute() current_target = responseGroupInfo["targetSize"] print(f"Current MIG target size: {current_target}") # Find instances that are being modified by the MIG (currentAction is # any value other than "NONE"). A common reason an instance is modified # is it because it has failed a health check. reqModifyingInstances = self.instanceGroupManagers.listManagedInstances( project=self.project, **self.zoneargs, instanceGroupManager=self.instance_group_manager, filter="currentAction != \"NONE\"", orderBy="creationTimestamp desc" ) respModifyingInstances = reqModifyingInstances.execute() # Find VMs that are idle (no dynamic slots created from partitionable # slots) in the MIG handled by this autoscaler filter_idle_vms = classad.ExprTree(f"PartitionableSlot && NumDynamicSlots==0") filter_claimed_vms = classad.ExprTree(f"PartitionableSlot && NumDynamicSlots>0") filter_mig = classad.ExprTree(f"regexp(\".*/{self.instance_group_manager}$\", CloudCreatedBy)") # A full list of Machine (StartD) ClassAd attributes can be found at # https://htcondor.readthedocs.io/en/latest/classad-attributes/machine-classad-attributes.html idle_node_ads = coll.query(htcondor.AdTypes.Startd, constraint=filter_idle_vms.and_(filter_mig), projection=["Machine", "CloudZone"]) NODENAME_ATTRIBUTE = "Machine" claimed_node_ads = coll.query(htcondor.AdTypes.Startd, constraint=filter_claimed_vms.and_(filter_mig), projection=[NODENAME_ATTRIBUTE]) claimed_nodes = [ ad[NODENAME_ATTRIBUTE].split(".")[0] for ad in claimed_node_ads] # treat OrderedDict as a set by ignoring key values; this set will # contain VMs we would consider deleting, in inverse order of # their readiness to join pool (creating, unhealthy, healthy+idle) idle_nodes = OrderedDict() try: modifyingInstances = respModifyingInstances["managedInstances"] except KeyError: modifyingInstances = [] print(f"There are {len(modifyingInstances)} VMs being modified by the managed instance group") # there is potential for nodes in MIG health check "VERIFYING" state # to have already joined the pool and be running jobs for instance in modifyingInstances: self_link = instance["instance"] node_name = self_link.rsplit("/", 1)[-1] if node_name not in claimed_nodes: idle_nodes[self_link] = "modifying" for ad in idle_node_ads: node = ad["Machine"].split(".")[0] zone = ad["CloudZone"] self_link = "https://www.googleapis.com/compute/v1/projects/" + \ self.project + "/zones/" + zone + "/instances/" + node # there is potential for nodes in MIG health check "VERIFYING" state # to have already joined the pool and be idle; delete them last if self_link in idle_nodes: idle_nodes.move_to_end(self_link) idle_nodes[self_link] = "idle" n_idle = len(idle_nodes) print(f"There are {n_idle} VMs being modified or idle in the pool") if self.debug > 1: print("Listing idle nodes:") pprint(idle_nodes) # always keep size tending toward the minimum idle VMs requested new_target = current_target + self.compute_instance_min_idle - n_idle + min_hosts_for_idle_jobs if new_target > self.compute_instance_limit: self.size = self.compute_instance_limit print(f"MIG target size will be limited by {self.compute_instance_limit}") else: self.size = new_target print(f"New MIG target size: {self.size}") if self.debug > 1: print("MIG Information:") print(responseGroupInfo) if self.size == current_target: if current_target == 0: print("Queue is empty") print("Running correct number of VMs to handle queue") exit() if self.size < current_target: print("Scaling down. Looking for nodes that can be shut down") if self.debug > 1: print("Compute node busy status:") for node in idle_nodes: print(node) # Shut down idle nodes up to our calculated limit nodes_to_delete = list(idle_nodes.keys())[0:current_target-self.size] for node in nodes_to_delete: print(f"Attempting to delete: {node.rsplit('/',1)[-1]}") respDel = self.deleteFromMig(nodes_to_delete) if self.debug > 1: print("Scaling down complete") if self.size > current_target: print( "Scaling up. Need to increase number of instances to " + str(self.size) ) # Request to resize request = self.instanceGroupManagers.resize( project=self.project, **self.zoneargs, instanceGroupManager=self.instance_group_manager, size=self.size, ) response = request.execute() if self.debug > 1: print("Requesting to increase MIG size") pprint(response) print("Scaling up complete") def main(): scaler = AutoScaler(args.mz) # Project ID scaler.project = args.p # Ex:'slurm-var-demo' # Name of the zone where the managed instance group is located scaler.zone = args.z # Ex: 'us-central1-f' # Name of the region where the managed instance group is located scaler.region = args.r # Ex: 'us-central1' # The name of the managed instance group. scaler.instance_group_manager = args.g # Ex: 'condor-compute-igm' # Default number of cores per instance, will be replaced with actual value scaler.cores_per_node = 4 # Default number of running instances that the managed instance group should maintain at any given time. This number will go up and down based on the load (number of jobs in the queue) scaler.size = 0 scaler.compute_instance_min_idle = args.i # Dry run: : 0, run scaling; 1, only provide info. scaler.dryrun = args.d > 0 # Debug level: 1-print debug information, 2 - print detail debug information scaler.debug = 0 if args.v: scaler.debug = args.v # Limit for the maximum number of compute instance. If zero (default setting), no limit will be enforced by the script scaler.compute_instance_limit = 0 if args.c: scaler.compute_instance_limit = abs(args.c) scaler.scale() if __name__ == "__main__": main()