in scripts/resume.py [0:0]
def group_nodes_bulk(nodes, resume_data=None):
"""group nodes by job_id, placement_group, node_group, and max bulkInsert size"""
if resume_data is None:
# all nodes will be considered jobless
jobs = {}
else:
jobs = {job.job_id: job for job in resume_data.jobs}
# expand all job nodelists
for job in jobs.values():
job.nodelist_alloc = job.nodes_alloc
job.nodes_alloc = util.to_hostnames(job.nodelist_alloc)
job.nodelist_resume = job.nodes_resume
job.nodes_resume = util.to_hostnames(job.nodelist_resume)
job.tpu = util.part_is_tpu(job.partition)
if not job.tpu:
# create placement groups if nodes for job need it
job.placement_groups = create_placement_groups(
node_list=job.nodes_alloc,
job_id=job.job_id,
)
# placement group assignment is based on all allocated nodes, but we only want to
# handle nodes in nodes_resume in this run.
for pg, pg_nodes in job.placement_groups.items():
job.placement_groups[pg] = list(
set(pg_nodes).intersection(job.nodes_resume)
)
# a bit of a hack, but nodes resumed using scontrol instead of through job scheduling do not have a job
jobless_nodes = list(
set(nodes).difference(
chain.from_iterable(job.nodes_resume for job in jobs.values())
)
)
jobless_nodes_tpu = []
for jobless_node in jobless_nodes[:]:
if lkp.node_is_tpu(jobless_node):
jobless_nodes.remove(jobless_node)
jobless_nodes_tpu.append(jobless_node)
jobs["Normal_None"] = NSDict(
job_id=None,
nodes_resume=jobless_nodes,
nodes_alloc=jobless_nodes,
placement_groups=create_placement_groups(node_list=jobless_nodes),
partition=None,
tpu=False,
)
jobs["TPU_None"] = NSDict(
job_id=None,
nodes_resume=jobless_nodes_tpu,
nodes_alloc=jobless_nodes_tpu,
partition=None,
tpu=True,
)
BulkChunk = collections.namedtuple(
"BulkChunk",
["prefix", "job_id", "partition_name", "placement_group", "nodes", "i"],
)
BulkChunkTPU = collections.namedtuple(
"BulkChunkTPU",
["prefix", "job_id", "partition_name", "nodes", "i"],
)
grouped_nodes = [
BulkChunk(
prefix,
job_id if job_id != "Normal_None" else None,
jobs[job_id].partition,
placement_group,
chunk_nodes,
i,
)
for job_id, job in jobs.items()
if not job.tpu
for placement_group, pg_nodes in job.placement_groups.items()
for prefix, nodes in util.groupby_unsorted(pg_nodes, lkp.node_prefix)
for i, chunk_nodes in enumerate(chunked(nodes, n=BULK_INSERT_LIMIT))
]
grouped_nodes_tpu = [
BulkChunkTPU(
prefix,
job_id if job_id != "TPU_None" else None,
jobs[job_id].partition,
chunk_nodes,
i,
)
for job_id, job in jobs.items()
if job.tpu
for prefix, nodes in util.groupby_unsorted(job.nodes_resume, lkp.node_prefix)
for i, chunk_nodes in enumerate(lkp.chunk_tpu_nodes(list(nodes)))
]
def group_name(chunk: BulkChunk):
if chunk.placement_group is not None:
return f"{chunk.prefix}:job{chunk.job_id}:{chunk.placement_group}:{chunk.i}"
if chunk.job_id is not None:
return f"{chunk.prefix}:job{chunk.job_id}:{chunk.i}"
return f"{chunk.prefix}:{chunk.i}"
def group_name_tpu(chunk: BulkChunkTPU):
if chunk.job_id is not None:
return f"{chunk.prefix}:job{chunk.job_id}:{chunk.i}"
return f"{chunk.prefix}:{chunk.i}"
grouped_nodes = {group_name(chunk): chunk for chunk in grouped_nodes}
grouped_nodes_tpu = {group_name_tpu(chunk): chunk for chunk in grouped_nodes_tpu}
return grouped_nodes, grouped_nodes_tpu