azure-slurm/slurmcc/partition.py (347 lines of code) (raw):

# Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. # import re from typing import Dict, List, Optional from hpc.autoscale import hpclogging as logging from hpc.autoscale import util as hpcutil from hpc.autoscale.hpctypes import PlacementGroup from hpc.autoscale.node.bucket import NodeBucket from hpc.autoscale.node.limits import BucketLimits from hpc.autoscale.node.nodemanager import NodeManager from . import util as slutil class Partition: def __init__( self, name: str, nodearray: str, nodename_prefix: str, machine_type: str, is_default: bool, is_hpc: bool, max_scaleset_size: int, buckets: List[NodeBucket], max_vm_count: int, use_pcpu: bool = False, dynamic_feature: Optional[str] = None, over_allocation_thresholds: Dict = {}, nodearray_machine_types: Optional[List[str]] = None, dampen_memory: Optional[float] = None, gpu_device_config: str = "", ) -> None: self.name = name self.nodearray = nodearray self.nodename_prefix = nodename_prefix self.machine_type = machine_type nodearray_machine_types = nodearray_machine_types or [self.machine_type] self.nodearray_machine_types = [x.lower() for x in nodearray_machine_types] self.is_default = is_default self.is_hpc = is_hpc if not is_hpc: assert len(buckets) == 1 assert len(buckets) > 0 self.max_scaleset_size = max_scaleset_size self.vcpu_count = buckets[0].vcpu_count memory_mb = buckets[0].memory.convert_to("m").value if dampen_memory is not None: if buckets[0].resources.get("slurm_memory"): logging.warning("Because slurm.dampen_memory is defined, slurm_memory as defined in the autoscale.json will be ignored.") to_decrement = max(1024, memory_mb * dampen_memory) self.memory = int(memory_mb - to_decrement) else: slurm_memory = buckets[0].resources.get("slurm_memory") if slurm_memory: self.memory = int(slurm_memory.convert_to("m").value) else: to_decrement = max(1024, memory_mb * 0.05) self.memory = int(memory_mb - to_decrement) self.max_vm_count = sum([b.max_count for b in buckets]) self.buckets = buckets self.use_pcpu = use_pcpu self.dynamic_feature = dynamic_feature # cache node_list property for dynamic partitions self.__dynamic_node_list_cache = None self.node_list_by_pg: Dict[ Optional[PlacementGroup], List[str] ] = _construct_node_list(self) self.over_allocation_thresholds = over_allocation_thresholds self.gpu_device_config = gpu_device_config self.features = [] if self.dynamic_feature: self.features = self.dynamic_feature.split(',') def bucket_for_node(self, node_name: str) -> NodeBucket: for pg, node_list in self.node_list_by_pg.items(): if node_name in node_list: for bucket in self.buckets: if bucket.placement_group == pg: return bucket raise RuntimeError() _SLURM_NODES_CACHE = None @classmethod def _slurm_nodes(cls) -> List[Dict[str, str]]: if not cls._SLURM_NODES_CACHE: cls._SLURM_NODES_CACHE = slutil.show_nodes() return cls._SLURM_NODES_CACHE @property def node_list(self) -> str: if not self.dynamic_feature: static_nodes = self._static_all_nodes() if not static_nodes: return "" return slutil.to_hostlist(self._static_all_nodes()) # with dynamic nodes, we only look at those defined in the partition if not self.__dynamic_node_list_cache: if not slutil.is_slurmctld_up(): logging.warning("While slurmctld is down, dynamic nodes can not be queried at this time.") return "" ret: List[str] = [] all_slurm_nodes = Partition._slurm_nodes() for node in all_slurm_nodes: partitions = node.get("Partitions", "").split(",") if self.name in partitions: # only include nodes that have the same vm_size declared as a feature features = (node.get("AvailableFeatures") or "").lower().split(",") if self.machine_type.lower() in features: ret.append(node["NodeName"]) else: matches_another_vm_size = set(self.nodearray_machine_types).intersection(set(features)) if matches_another_vm_size: # this node has a declared vm_size, but it's not the one we're looking for. continue # we only use the highest priority vm_size as the default - let that # partition object handle this. if self.machine_type.lower() != self.nodearray_machine_types[0]: continue # anything that starts with standard_ "looks" like a vm_size, at least for logging purposes possible_vm_sizes = [f for f in features if f.startswith("standard_")] if possible_vm_sizes: # we do allow users to use features that start with standard_, but we # want to warn them at least. For example, someone puts standard_f2_2v instead of _v2 # or say someone unchecks the relevant vm_size from the CycleCloud cluster. logging.warning("Found potential vm_size %s - however none match approved vm sizes %s. Is this a typo?", ",".join(possible_vm_sizes), ",".join(self.nodearray_machine_types)) # if we get here, we will just use our machine type as the vm_size. # HOWEVER we bump the logging up to warning if there are multiple vm_sizes defined # for this nodearray, as users _should_ be specific about which vm size they want when # allowing more than one per nodearray. log_level = logging.DEBUG if len(self.nodearray_machine_types) > 1 else logging.WARNING logging.log(log_level, "No vm_size feature for node %s is defined, assuming vm_size %s", node["NodeName"], self.machine_type) ret.append(node["NodeName"]) self.__dynamic_node_list_cache = slutil.to_hostlist(ret) if ret else "" return self.__dynamic_node_list_cache def _static_all_nodes(self) -> List[str]: ret = [] for bucket in self.buckets: ret.extend(self.node_list_by_pg[bucket.placement_group]) ret = sorted(ret, key=slutil.get_sort_key_func(self.is_hpc)) return ret def all_nodes(self) -> List[str]: if not self.dynamic_feature: return self._static_all_nodes() return slutil.from_hostlist(self.node_list) @property def pcpu_count(self) -> int: return self.buckets[0].pcpu_count @property def gpu_count(self) -> int: if "slurm_gpus" in self.buckets[0].resources: return self.buckets[0].resources["slurm_gpus"] return self.buckets[0].gpu_count def _construct_node_list( partition: Partition, ) -> Dict[Optional[PlacementGroup], List[str]]: if partition.dynamic_feature: return _construct_dynamic_node_list(partition) else: return _construct_static_node_list(partition) def _construct_dynamic_node_list( partition: Partition, ) -> Dict[Optional[PlacementGroup], List[str]]: part_node_names = slutil.from_hostlist(partition.node_list) return {None: part_node_names} def _construct_static_node_list( partition: Partition, ) -> Dict[Optional[PlacementGroup], List[str]]: name_format = partition.nodename_prefix + "{}-%d" valid_node_names = {} vm_count = 0 for bucket in partition.buckets: valid_node_names[bucket.placement_group] = list() while vm_count < partition.max_vm_count: name_index = vm_count + 1 node_name = name_format.format(partition.nodearray) % (name_index) valid_node_names[bucket.placement_group].append(node_name) vm_count += 1 return valid_node_names def _parse_default_overallocations( partition_name: str, overallocation_expr: List ) -> Dict: working_overalloc_thresholds = {} if len(overallocation_expr) == 0 or len(overallocation_expr) % 2 == 1: logging.error(f"Invalid slurm.overallocation expression for {partition_name}.") logging.error("Must have an even number of elements, larger than 0. Ignoring") return {} else: for i in range(0, len(overallocation_expr), 2): threshold = overallocation_expr[i] percentage = overallocation_expr[i + 1] try: threshold = int(threshold) except: logging.error( f"Invalid threshold at index {i} for partition {partition_name} - {threshold}, expected an integer.", threshold, ) logging.error("You will have to edit autoscale.json manually instead.") return {} try: percentage = float(percentage) if percentage < 0 or percentage > 1: logging.error( f"Invalid percentage at index {i} for {partition_name} - {percentage}. It must be a number between [0, 1]" ) logging.error( "You will have to edit autoscale.json manually instead." ) return {} except: logging.error( f"Invalid threshold at index {i} for partition {partition_name} - {threshold}, expected an integer.", threshold, ) logging.error("You will have to edit autoscale.json manually instead.") return {} working_overalloc_thresholds[str(threshold)] = percentage return working_overalloc_thresholds def fetch_partitions( node_mgr: NodeManager, include_dynamic: bool = False ) -> List[Partition]: """ Construct a mapping of SLURM partition name -> relevant nodearray information. There must be a one-to-one mapping of partition name to nodearray. If not, first one wins. """ all_partitions: List[Partition] = [] split_buckets = hpcutil.partition( node_mgr.get_buckets(), lambda b: (b.nodearray, b.vm_size) ) # dict[nodearray, List[vm_size]] nodearray_vm_size: Dict[str, List[str]] = {} for nodearray, vm_size in split_buckets.keys(): if nodearray not in nodearray_vm_size: nodearray_vm_size[nodearray] = [] nodearray_vm_size[nodearray].append(vm_size) for buckets in split_buckets.values(): nodearray_name = buckets[0].nodearray slurm_config = buckets[0].software_configuration.get("slurm", {}) dynamic_feature = slurm_config.get("dynamic_feature") # This is added for backwards compatibility. # TODO: Remove this for 4.x if not dynamic_feature: dynamic_config = slurm_config.get("dynamic_config") if dynamic_config: toks = dynamic_config.replace('"', "").replace("'", "").split() for tok in toks: if "=" in tok: key, value = tok.split("=", 1) if key.lower() == "feature": dynamic_feature = value.strip() is_hpc = str(slurm_config.get("hpc", True)).lower() == "true" is_autoscale = slurm_config.get("autoscale", True) # TODO if is_autoscale is None: logging.warning( "Nodearray %s does not define slurm.autoscale, skipping.", nodearray_name, ) continue if is_autoscale is False: logging.debug( "Nodearray %s explicitly defined slurm.autoscale=false, skipping.", nodearray_name, ) continue all_buckets = buckets if is_hpc: # TODO RDH there should only be one long term. # we need to allow the user to pick a new placement group # maybe... buckets = [b for b in buckets if b.placement_group] else: buckets = [b for b in buckets if not b.placement_group] assert ( len(buckets) == 1 ), f"{[(b.nodearray, b.placement_group, is_hpc) for b in all_buckets]}" if not buckets: continue if not nodearray_name: logging.error("Name is not defined for nodearray. Skipping") continue partition_name = slurm_config.get("partition", nodearray_name) unescaped_nodename_prefix = slurm_config.get("node_prefix") or "" nodename_prefix = re.sub("[^a-zA-Z0-9-]", "-", unescaped_nodename_prefix) if unescaped_nodename_prefix != nodename_prefix: logging.warning( "slurm.node_prefix for partition %s was converted from '%s' to '%s' due to invalid hostname characters.", nodearray_name, unescaped_nodename_prefix, nodename_prefix, ) if len(buckets) > 1 and not dynamic_feature: logging.warning( "Multiple buckets defined for nodearray %s, but no dynamic_feature. Using first bucket (vm_size or placement group) only.", nodearray_name, ) buckets = [buckets[0]] for bucket in buckets: machine_type = bucket.vm_size if not machine_type: logging.warning( "MachineType not defined for nodearray %s. Skipping", nodearray_name ) continue limits: BucketLimits = buckets[0].limits if limits.max_count <= 0: logging.info( "Bucket has a max_count <= 0, defined for machinetype=='%s'.", machine_type, ) # keep this partition around, but we will ignore it when generating later. max_scaleset_size = buckets[0].max_placement_group_size dampen_memory = None dampen_memory_str = slurm_config.get("dampen_memory") if dampen_memory_str: dampen_memory = int(dampen_memory_str) / 100.0 if not is_hpc: max_scaleset_size = 2 ** 31 use_pcpu = str(slurm_config.get("use_pcpu", True)).lower() == "true" overallocation_expr = slurm_config.get("overallocation") or [] over_allocation_thresholds = {} if overallocation_expr: over_allocation_thresholds = _parse_default_overallocations( partition_name, overallocation_expr ) gpu_device_config = slurm_config.get("gpu_device_config") or "" all_partitions.append( Partition( partition_name, nodearray_name, nodename_prefix, machine_type, slurm_config.get("default_partition", False), is_hpc, max_scaleset_size, buckets, limits.max_count, use_pcpu=use_pcpu, dynamic_feature=dynamic_feature, over_allocation_thresholds=over_allocation_thresholds, nodearray_machine_types=nodearray_vm_size.get(nodearray_name), dampen_memory=dampen_memory, gpu_device_config=gpu_device_config, ) ) filtered_partitions = [] by_name = hpcutil.partition(all_partitions, lambda p: p.name) for pname, parts in by_name.items(): all_dyn = set([bool(p.dynamic_feature) for p in parts]) if len(all_dyn) > 1: logging.error( "Found two partitions with the same name, but only one is dynamic." ) disabled_parts_message = ["/".join([p.name, p.nodearray]) for p in parts] logging.error(f"Disabling {disabled_parts_message}") else: if len(parts) > 1 and False in all_dyn: logging.error( "Only partitions with slurm.dynamic_feature may point to more than one nodearray." ) disabled_parts_message = [ "/".join([p.name, p.nodearray]) for p in parts ] logging.error(f"Disabling {disabled_parts_message}") else: filtered_partitions.extend(parts) default_partitions = [p for p in filtered_partitions if p.is_default] if len(default_partitions) == 0: logging.warning("slurm.default_partition was not set on any nodearray.") # one nodearray, just assume it is the default if len(filtered_partitions) == 1: logging.info("Only one nodearray was defined, setting as default.") filtered_partitions[0].is_default = True elif len(default_partitions) > 1: # no partition is default logging.warning("slurm.default_partition was set on more than one nodearray!") return filtered_partitions