in community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/resume.py [0:0]
def resume_nodes(nodes: List[str], resume_data: Optional[ResumeData]):
"""resume nodes in nodelist"""
lkp = lookup()
# Prevent dormant nodes associated with a future reservation from being resumed
nodes, dormant_fr_nodes = util.separate(lkp.is_dormant_fr_node, nodes)
if dormant_fr_nodes:
log.warning(f"Resume was unable to resume future reservation nodes={dormant_fr_nodes}")
down_nodes_notify_jobs(dormant_fr_nodes, "Reservation is not active, nodes cannot be resumed", resume_data)
nodes, flex_managed = util.separate(lambda n: lkp.is_flex_node(n) and (lkp.instance(n) is not None), nodes)
if flex_managed:
# TODO(FLEX): This is weak assumption, VM may not exist, but still be managed by MIG (e.g. still provisioning)
# Inspect all present MIGs instead.
# Particularly CRITICAL due to ActionOnFailure=DO_NOTHING, hence deleted VMs will not be repaired.
log.warning(f"Resume was unable to resume nodes={flex_managed} already managed by MIGs")
down_nodes_notify_jobs(flex_managed, "VM is managed MIG, can not be resumed", resume_data)
if not nodes:
log.info("No nodes to resume")
return
nodes = sorted(nodes, key=lkp.node_prefix)
grouped_nodes = group_nodes_bulk(nodes, resume_data, lkp)
if log.isEnabledFor(logging.DEBUG):
grouped_nodelists = {
group: to_hostlist(chunk.nodes) for group, chunk in grouped_nodes.items()
}
log.debug(
"node bulk groups: \n{}".format(yaml.safe_dump(grouped_nodelists).rstrip())
)
tpu_chunks, flex_chunks = [], []
bi_inserts = {}
for group, chunk in grouped_nodes.items():
model = chunk.nodes[0]
if lkp.node_is_tpu(model):
tpu_chunks.append(chunk.nodes)
elif lkp.is_flex_node(model):
flex_chunks.append(chunk)
else:
bi_inserts[group] = create_instances_request(
chunk.nodes, chunk.placement_group, chunk.excl_job_id
)
for chunk in flex_chunks:
mig_flex.resume_flex_chunk(chunk.nodes, chunk.excl_job_id, lkp)
# execute all bulkInsert requests with batch
bulk_ops = dict(
zip(bi_inserts.keys(), map_with_futures(ensure_execute, bi_inserts.values()))
)
log.debug(f"bulk_ops={yaml.safe_dump(bulk_ops)}")
started = {
group: op for group, op in bulk_ops.items() if not isinstance(op, Exception)
}
failed = {
group: err for group, err in bulk_ops.items() if isinstance(err, Exception)
}
if failed:
failed_reqs = [str(e) for e in failed.items()]
log.error("bulkInsert API failures: {}".format("; ".join(failed_reqs)))
for ident, exc in failed.items():
down_nodes_notify_jobs(grouped_nodes[ident].nodes, f"GCP Error: {exc._get_reason()}", resume_data) # type: ignore
if log.isEnabledFor(logging.DEBUG):
for group, op in started.items():
group_nodes = grouped_nodelists[group]
name = op["name"]
gid = op["operationGroupId"]
log.debug(
f"new bulkInsert operation started: group={group} nodes={group_nodes} name={name} operationGroupId={gid}"
)
# wait for all bulkInserts to complete and log any errors
bulk_operations = {group: wait_for_operation(op) for group, op in started.items()}
# Start TPU after regular nodes so that regular nodes are not affected by the slower TPU nodes
execute_with_futures(tpu.start_tpu, tpu_chunks)
for group, op in bulk_operations.items():
_handle_bulk_insert_op(op, grouped_nodes[group].nodes, resume_data)