in community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/resume.py [0:0]
def create_nodeset_placements(nodes: List[str], excl_job_id:Optional[int], lkp: util.Lookup) -> List[PlacementAndNodes]:
placements = _allocate_nodes_to_placements(nodes, excl_job_id, lkp)
region = lkp.node_region(nodes[0])
max_distance = lkp.node_nodeset(nodes[0]).get('placement_max_distance')
if log.isEnabledFor(logging.DEBUG):
debug_p = {p.placement: to_hostlist(p.nodes) for p in placements}
log.debug(
f"creating {len(placements)} placement groups: \n{yaml.safe_dump(debug_p).rstrip()}"
)
requests = {
p.placement: create_placement_request(p.placement, region, max_distance) for p in placements if p.placement
}
if not requests:
return placements
# TODO: aggregate all requests for whole resume and execute them at once (don't limit to nodeset/job)
ops = dict(
zip(requests.keys(), map_with_futures(ensure_execute, requests.values()))
)
def classify_result(item):
op = item[1]
if not isinstance(op, Exception):
return "submitted"
if all(e.get("reason") == "alreadyExists" for e in op.error_details): # type: ignore
return "redundant"
return "failed"
grouped_ops = dict(util.groupby_unsorted(list(ops.items()), classify_result))
submitted, redundant, failed = (
dict(grouped_ops.get(key, {})) for key in ("submitted", "redundant", "failed")
)
if redundant:
log.warning(
"placement policies already exist: {}".format(",".join(redundant.keys()))
)
if failed:
reqs = [f"{e}" for _, e in failed.values()]
log.fatal("failed to create placement policies: {}".format("; ".join(reqs)))
operations = {group: wait_for_operation(op) for group, op in submitted.items()}
for group, op in operations.items():
if "error" in op:
msg = "; ".join(
f"{err['code']}: {err['message'] if 'message' in err else 'no message'}"
for err in op["error"]["errors"]
)
log.error(
f"placement group failed to create: '{group}' ({op['name']}): {msg}"
)
log.info(
f"created {len(operations)} placement groups ({to_hostlist(operations.keys())})"
)
return placements