in community/modules/scripts/htcondor-install/files/autoscaler.py [0:0]
def scale(self):
# diagnosis
if self.debug > 1:
print("Launching autoscaler.py with the following arguments:")
print("project_id: " + self.project)
print("zone: " + self.zone)
print("region: " + self.region)
print(f"multizone: {self.multizone}")
print("group_manager: " + self.instance_group_manager)
print("computeinstancelimit: " + str(self.compute_instance_limit))
print("debuglevel: " + str(self.debug))
if self.multizone:
self.zoneargs = {"region": self.region}
else:
self.zoneargs = {"zone": self.zone}
# Each HTCondor scheduler (SchedD), maintains a list of jobs under its
# stewardship. A full list of Job ClassAd attributes can be found at
# https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html
schedd = htcondor.Schedd()
# encourage the job queue to start a new negotiation cycle; there are
# internal unconfigurable rate limits so not guaranteed; this is not
# strictly required for success, but may reduce latency of autoscaling
schedd.reschedule()
REQUEST_CPUS_ATTRIBUTE = "RequestCpus"
REQUEST_GPUS_ATTRIBUTE = "RequestGpus"
REQUEST_MEMORY_ATTRIBUTE = "RequestMemory"
job_attributes = [
REQUEST_CPUS_ATTRIBUTE,
REQUEST_GPUS_ATTRIBUTE,
REQUEST_MEMORY_ATTRIBUTE,
]
instanceTemplateInfo = self.getInstanceTemplateInfo()
self.is_spot = instanceTemplateInfo["is_spot"]
self.cores_per_node = instanceTemplateInfo["guest_cpus"]
print(f"MIG is configured for Spot pricing: {self.is_spot}")
print("Number of CPU per compute node: " + str(self.cores_per_node))
# this query will constrain the search for jobs to those that either
# require spot VMs or do not require Spot VMs based on whether the
# VM instance template is configured for Spot pricing
spot_query = classad.ExprTree(f"RequireId == \"{self.instance_group_manager}\"")
# For purpose of scaling a Managed Instance Group, count only jobs that
# are idle and likely participated in a negotiation cycle (there does
# not appear to be a single classad attribute for this).
# https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#JobStatus
LAST_CYCLE_ATTRIBUTE = "LastNegotiationCycleTime0"
coll = htcondor.Collector()
negotiator_ad = coll.query(htcondor.AdTypes.Negotiator, projection=[LAST_CYCLE_ATTRIBUTE])
if len(negotiator_ad) != 1:
print(f"There should be exactly 1 negotiator in the pool. There is {len(negotiator_ad)}")
exit()
last_negotiation_cycle_time = negotiator_ad[0].get(LAST_CYCLE_ATTRIBUTE)
if not last_negotiation_cycle_time:
print(f"The negotiator has not yet started a match cycle. Exiting auto-scaling.")
exit()
print(f"Last negotiation cycle occurred at: {datetime.fromtimestamp(last_negotiation_cycle_time)}")
idle_job_query = classad.ExprTree(f"JobStatus == 1 && QDate < {last_negotiation_cycle_time}")
idle_job_ads = schedd.query(constraint=idle_job_query.and_(spot_query),
projection=job_attributes)
total_idle_request_cpus = sum(j[REQUEST_CPUS_ATTRIBUTE] for j in idle_job_ads)
print(f"Total CPUs requested by idle jobs: {total_idle_request_cpus}")
if self.debug > 1:
print("Information about the compute instance template")
pprint(instanceTemplateInfo)
# Calculate the minimum number of instances that, for fully packed
# execute points, could satisfy current job queue
min_hosts_for_idle_jobs = math.ceil(total_idle_request_cpus / self.cores_per_node)
if self.debug > 0:
print(f"Minimum hosts needed: {total_idle_request_cpus} / {self.cores_per_node} = {min_hosts_for_idle_jobs}")
# Get current number of instances in the MIG
requestGroupInfo = self.instanceGroupManagers.get(
project=self.project,
**self.zoneargs,
instanceGroupManager=self.instance_group_manager,
)
responseGroupInfo = requestGroupInfo.execute()
current_target = responseGroupInfo["targetSize"]
print(f"Current MIG target size: {current_target}")
# Find instances that are being modified by the MIG (currentAction is
# any value other than "NONE"). A common reason an instance is modified
# is it because it has failed a health check.
reqModifyingInstances = self.instanceGroupManagers.listManagedInstances(
project=self.project,
**self.zoneargs,
instanceGroupManager=self.instance_group_manager,
filter="currentAction != \"NONE\"",
orderBy="creationTimestamp desc"
)
respModifyingInstances = reqModifyingInstances.execute()
# Find VMs that are idle (no dynamic slots created from partitionable
# slots) in the MIG handled by this autoscaler
filter_idle_vms = classad.ExprTree(f"PartitionableSlot && NumDynamicSlots==0")
filter_claimed_vms = classad.ExprTree(f"PartitionableSlot && NumDynamicSlots>0")
filter_mig = classad.ExprTree(f"regexp(\".*/{self.instance_group_manager}$\", CloudCreatedBy)")
# A full list of Machine (StartD) ClassAd attributes can be found at
# https://htcondor.readthedocs.io/en/latest/classad-attributes/machine-classad-attributes.html
idle_node_ads = coll.query(htcondor.AdTypes.Startd,
constraint=filter_idle_vms.and_(filter_mig),
projection=["Machine", "CloudZone"])
NODENAME_ATTRIBUTE = "Machine"
claimed_node_ads = coll.query(htcondor.AdTypes.Startd,
constraint=filter_claimed_vms.and_(filter_mig),
projection=[NODENAME_ATTRIBUTE])
claimed_nodes = [ ad[NODENAME_ATTRIBUTE].split(".")[0] for ad in claimed_node_ads]
# treat OrderedDict as a set by ignoring key values; this set will
# contain VMs we would consider deleting, in inverse order of
# their readiness to join pool (creating, unhealthy, healthy+idle)
idle_nodes = OrderedDict()
try:
modifyingInstances = respModifyingInstances["managedInstances"]
except KeyError:
modifyingInstances = []
print(f"There are {len(modifyingInstances)} VMs being modified by the managed instance group")
# there is potential for nodes in MIG health check "VERIFYING" state
# to have already joined the pool and be running jobs
for instance in modifyingInstances:
self_link = instance["instance"]
node_name = self_link.rsplit("/", 1)[-1]
if node_name not in claimed_nodes:
idle_nodes[self_link] = "modifying"
for ad in idle_node_ads:
node = ad["Machine"].split(".")[0]
zone = ad["CloudZone"]
self_link = "https://www.googleapis.com/compute/v1/projects/" + \
self.project + "/zones/" + zone + "/instances/" + node
# there is potential for nodes in MIG health check "VERIFYING" state
# to have already joined the pool and be idle; delete them last
if self_link in idle_nodes:
idle_nodes.move_to_end(self_link)
idle_nodes[self_link] = "idle"
n_idle = len(idle_nodes)
print(f"There are {n_idle} VMs being modified or idle in the pool")
if self.debug > 1:
print("Listing idle nodes:")
pprint(idle_nodes)
# always keep size tending toward the minimum idle VMs requested
new_target = current_target + self.compute_instance_min_idle - n_idle + min_hosts_for_idle_jobs
if new_target > self.compute_instance_limit:
self.size = self.compute_instance_limit
print(f"MIG target size will be limited by {self.compute_instance_limit}")
else:
self.size = new_target
print(f"New MIG target size: {self.size}")
if self.debug > 1:
print("MIG Information:")
print(responseGroupInfo)
if self.size == current_target:
if current_target == 0:
print("Queue is empty")
print("Running correct number of VMs to handle queue")
exit()
if self.size < current_target:
print("Scaling down. Looking for nodes that can be shut down")
if self.debug > 1:
print("Compute node busy status:")
for node in idle_nodes:
print(node)
# Shut down idle nodes up to our calculated limit
nodes_to_delete = list(idle_nodes.keys())[0:current_target-self.size]
for node in nodes_to_delete:
print(f"Attempting to delete: {node.rsplit('/',1)[-1]}")
respDel = self.deleteFromMig(nodes_to_delete)
if self.debug > 1:
print("Scaling down complete")
if self.size > current_target:
print(
"Scaling up. Need to increase number of instances to " + str(self.size)
)
# Request to resize
request = self.instanceGroupManagers.resize(
project=self.project,
**self.zoneargs,
instanceGroupManager=self.instance_group_manager,
size=self.size,
)
response = request.execute()
if self.debug > 1:
print("Requesting to increase MIG size")
pprint(response)
print("Scaling up complete")