in community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/resume.py [0:0]
def group_nodes_bulk(nodes: List[str], resume_data: Optional[ResumeData], lkp: util.Lookup):
"""group nodes by nodeset, placement_group, exclusive_job_id if any"""
if resume_data is None: # all nodes will be considered jobless
resume_data = ResumeData(jobs=[])
nodes_set = set(nodes) # turn into set to simplify intersection
non_excl = nodes_set.copy()
groups : Dict[Optional[int], List[PlacementAndNodes]] = {} # excl_job_id|none -> PlacementAndNodes
# expand all exclusive job nodelists
for job in resume_data.jobs:
if not lkp.cfg.partitions[job.partition].enable_job_exclusive:
continue
groups[job.job_id] = []
# placement group assignment is based on all allocated nodes, ...
for pn in create_placements(job.nodes_alloc, job.job_id, lkp):
groups[job.job_id].append(
PlacementAndNodes(
placement=pn.placement,
#... but we only want to handle nodes in nodes_resume in this run.
nodes = sorted(set(pn.nodes) & nodes_set)
))
non_excl.difference_update(job.nodes_alloc)
groups[None] = create_placements(sorted(non_excl), excl_job_id=None, lkp=lkp)
def chunk_nodes(nodes: List[str]):
if not nodes:
return []
model = nodes[0]
if lkp.is_flex_node(model):
chunk_size = ZONAL_MIG_SIZE_LIMIT
elif lkp.node_is_tpu(model):
ns_name = lkp.node_nodeset_name(model)
chunk_size = tpu.TPU.make(ns_name, lkp).vmcount
else:
chunk_size = BULK_INSERT_LIMIT
return chunked(nodes, n=chunk_size)
chunks = [
BulkChunk(
nodes=nodes_chunk,
prefix=lkp.node_prefix(nodes_chunk[0]), # <cluster_name>-<nodeset_name>
excl_job_id = job_id,
placement_group=pn.placement,
chunk_idx=i)
for job_id, placements in groups.items()
for pn in placements if pn.nodes
for i, nodes_chunk in enumerate(chunk_nodes(pn.nodes))
]
return {chunk.name: chunk for chunk in chunks}