def scale()

in community/modules/scripts/htcondor-install/files/autoscaler.py [0:0]


    def scale(self):
        # diagnosis
        if self.debug > 1:
            print("Launching autoscaler.py with the following arguments:")
            print("project_id: " + self.project)
            print("zone: " + self.zone)
            print("region: " + self.region)
            print(f"multizone: {self.multizone}")
            print("group_manager: " + self.instance_group_manager)
            print("computeinstancelimit: " + str(self.compute_instance_limit))
            print("debuglevel: " + str(self.debug))

        if self.multizone:
            self.zoneargs = {"region": self.region}
        else:
            self.zoneargs = {"zone": self.zone}

        # Each HTCondor scheduler (SchedD), maintains a list of jobs under its
        # stewardship. A full list of Job ClassAd attributes can be found at
        # https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html
        schedd = htcondor.Schedd()
        # encourage the job queue to start a new negotiation cycle; there are
        # internal unconfigurable rate limits so not guaranteed; this is not
        # strictly required for success, but may reduce latency of autoscaling
        schedd.reschedule()
        REQUEST_CPUS_ATTRIBUTE = "RequestCpus"
        REQUEST_GPUS_ATTRIBUTE = "RequestGpus"
        REQUEST_MEMORY_ATTRIBUTE = "RequestMemory"
        job_attributes = [
            REQUEST_CPUS_ATTRIBUTE,
            REQUEST_GPUS_ATTRIBUTE,
            REQUEST_MEMORY_ATTRIBUTE,
        ]

        instanceTemplateInfo = self.getInstanceTemplateInfo()
        self.is_spot = instanceTemplateInfo["is_spot"]
        self.cores_per_node = instanceTemplateInfo["guest_cpus"]
        print(f"MIG is configured for Spot pricing: {self.is_spot}")
        print("Number of CPU per compute node: " + str(self.cores_per_node))

        # this query will constrain the search for jobs to those that either
        # require spot VMs or do not require Spot VMs based on whether the
        # VM instance template is configured for Spot pricing
        spot_query = classad.ExprTree(f"RequireId == \"{self.instance_group_manager}\"")

        # For purpose of scaling a Managed Instance Group, count only jobs that
        # are idle and likely participated in a negotiation cycle (there does
        # not appear to be a single classad attribute for this).
        # https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#JobStatus
        LAST_CYCLE_ATTRIBUTE = "LastNegotiationCycleTime0"
        coll = htcondor.Collector()
        negotiator_ad = coll.query(htcondor.AdTypes.Negotiator, projection=[LAST_CYCLE_ATTRIBUTE])
        if len(negotiator_ad) != 1:
            print(f"There should be exactly 1 negotiator in the pool. There is {len(negotiator_ad)}")
            exit()
        last_negotiation_cycle_time = negotiator_ad[0].get(LAST_CYCLE_ATTRIBUTE)
        if not last_negotiation_cycle_time:
            print(f"The negotiator has not yet started a match cycle. Exiting auto-scaling.")
            exit()

        print(f"Last negotiation cycle occurred at: {datetime.fromtimestamp(last_negotiation_cycle_time)}")
        idle_job_query = classad.ExprTree(f"JobStatus == 1 && QDate < {last_negotiation_cycle_time}")
        idle_job_ads = schedd.query(constraint=idle_job_query.and_(spot_query),
                                    projection=job_attributes)

        total_idle_request_cpus = sum(j[REQUEST_CPUS_ATTRIBUTE] for j in idle_job_ads)
        print(f"Total CPUs requested by idle jobs: {total_idle_request_cpus}")

        if self.debug > 1:
            print("Information about the compute instance template")
            pprint(instanceTemplateInfo)

        # Calculate the minimum number of instances that, for fully packed
        # execute points, could satisfy current job queue
        min_hosts_for_idle_jobs = math.ceil(total_idle_request_cpus / self.cores_per_node)
        if self.debug > 0:
            print(f"Minimum hosts needed: {total_idle_request_cpus} / {self.cores_per_node} = {min_hosts_for_idle_jobs}")

        # Get current number of instances in the MIG
        requestGroupInfo = self.instanceGroupManagers.get(
            project=self.project,
            **self.zoneargs,
            instanceGroupManager=self.instance_group_manager,
        )
        responseGroupInfo = requestGroupInfo.execute()
        current_target = responseGroupInfo["targetSize"]
        print(f"Current MIG target size: {current_target}")

        # Find instances that are being modified by the MIG (currentAction is
        # any value other than "NONE"). A common reason an instance is modified
        # is it because it has failed a health check.
        reqModifyingInstances = self.instanceGroupManagers.listManagedInstances(
            project=self.project,
            **self.zoneargs,
            instanceGroupManager=self.instance_group_manager,
            filter="currentAction != \"NONE\"",
            orderBy="creationTimestamp desc"
        )
        respModifyingInstances = reqModifyingInstances.execute()

        # Find VMs that are idle (no dynamic slots created from partitionable
        # slots) in the MIG handled by this autoscaler
        filter_idle_vms = classad.ExprTree(f"PartitionableSlot && NumDynamicSlots==0")
        filter_claimed_vms = classad.ExprTree(f"PartitionableSlot && NumDynamicSlots>0")
        filter_mig = classad.ExprTree(f"regexp(\".*/{self.instance_group_manager}$\", CloudCreatedBy)")
        # A full list of Machine (StartD) ClassAd attributes can be found at
        # https://htcondor.readthedocs.io/en/latest/classad-attributes/machine-classad-attributes.html
        idle_node_ads = coll.query(htcondor.AdTypes.Startd,
            constraint=filter_idle_vms.and_(filter_mig),
            projection=["Machine", "CloudZone"])

        NODENAME_ATTRIBUTE = "Machine"
        claimed_node_ads = coll.query(htcondor.AdTypes.Startd,
            constraint=filter_claimed_vms.and_(filter_mig),
            projection=[NODENAME_ATTRIBUTE])
        claimed_nodes = [ ad[NODENAME_ATTRIBUTE].split(".")[0] for ad in claimed_node_ads]

        # treat OrderedDict as a set by ignoring key values; this set will
        # contain VMs we would consider deleting, in inverse order of
        # their readiness to join pool (creating, unhealthy, healthy+idle)
        idle_nodes = OrderedDict()
        try:
            modifyingInstances = respModifyingInstances["managedInstances"]
        except KeyError:
            modifyingInstances = []

        print(f"There are {len(modifyingInstances)} VMs being modified by the managed instance group")

        # there is potential for nodes in MIG health check "VERIFYING" state
        # to have already joined the pool and be running jobs
        for instance in modifyingInstances:
            self_link = instance["instance"]
            node_name = self_link.rsplit("/", 1)[-1]
            if node_name not in claimed_nodes:
                idle_nodes[self_link] = "modifying"

        for ad in idle_node_ads:
            node = ad["Machine"].split(".")[0]
            zone = ad["CloudZone"]
            self_link = "https://www.googleapis.com/compute/v1/projects/" + \
                self.project + "/zones/" + zone + "/instances/" + node
            # there is potential for nodes in MIG health check "VERIFYING" state
            # to have already joined the pool and be idle; delete them last
            if self_link in idle_nodes:
                idle_nodes.move_to_end(self_link)
            idle_nodes[self_link] = "idle"
        n_idle = len(idle_nodes)

        print(f"There are {n_idle} VMs being modified or idle in the pool")
        if self.debug > 1:
            print("Listing idle nodes:")
            pprint(idle_nodes)

        # always keep size tending toward the minimum idle VMs requested
        new_target = current_target + self.compute_instance_min_idle - n_idle + min_hosts_for_idle_jobs
        if new_target > self.compute_instance_limit:
            self.size = self.compute_instance_limit
            print(f"MIG target size will be limited by {self.compute_instance_limit}")
        else:
            self.size = new_target

        print(f"New MIG target size: {self.size}")

        if self.debug > 1:
            print("MIG Information:")
            print(responseGroupInfo)

        if self.size == current_target:
            if current_target == 0:
                print("Queue is empty")
            print("Running correct number of VMs to handle queue")
            exit()

        if self.size < current_target:
            print("Scaling down. Looking for nodes that can be shut down")

            if self.debug > 1:
                print("Compute node busy status:")
                for node in idle_nodes:
                    print(node)

            # Shut down idle nodes up to our calculated limit
            nodes_to_delete = list(idle_nodes.keys())[0:current_target-self.size]
            for node in nodes_to_delete:
                print(f"Attempting to delete: {node.rsplit('/',1)[-1]}")
            respDel = self.deleteFromMig(nodes_to_delete)

            if self.debug > 1:
                print("Scaling down complete")

        if self.size > current_target:
            print(
                "Scaling up. Need to increase number of instances to " + str(self.size)
            )
            # Request to resize
            request = self.instanceGroupManagers.resize(
                project=self.project,
                **self.zoneargs,
                instanceGroupManager=self.instance_group_manager,
                size=self.size,
            )
            response = request.execute()
            if self.debug > 1:
                print("Requesting to increase MIG size")
                pprint(response)
                print("Scaling up complete")