in scripts/resume.py [0:0]
def create_nodeset_placement_groups(node_list: list, job_id=0):
model = next(iter(node_list))
nodeset = lkp.node_nodeset(model)
if not nodeset.enable_placement:
return {None: node_list}
if not valid_placement_nodes(node_list):
return {None: node_list}
region = lkp.node_region(model)
groups = {
f"{cfg.slurm_cluster_name}-{nodeset.nodeset_name}-{job_id}-{i}": nodes
for i, nodes in enumerate(chunked(node_list, n=PLACEMENT_MAX_CNT))
}
if log.isEnabledFor(logging.DEBUG):
debug_groups = {
group: to_hostlist_fast(nodes) for group, nodes in groups.items()
}
log.debug(
f"creating {len(groups)} placement groups: \n{yaml.safe_dump(debug_groups).rstrip()}"
)
requests = {
group: create_placement_request(group, region)
for group, incl_nodes in groups.items()
}
ops = dict(
zip(requests.keys(), map_with_futures(ensure_execute, requests.values()))
)
def classify_result(item):
op = item[1]
if not isinstance(op, Exception):
return "submitted"
if all(e.get("reason") == "alreadyExists" for e in op.error_details):
return "redundant"
return "failed"
grouped_ops = dict(util.groupby_unsorted(list(ops.items()), classify_result))
submitted, redundant, failed = (
dict(grouped_ops.get(key, {})) for key in ("submitted", "redundant", "failed")
)
if redundant:
log.warning(
"placement policies already exist: {}".format(",".join(redundant.keys()))
)
if failed:
reqs = [f"{e}" for _, e in failed.values()]
log.fatal("failed to create placement policies: {}".format("; ".join(reqs)))
operations = {group: wait_for_operation(op) for group, op in submitted.items()}
for group, op in operations.items():
if "error" in op:
msg = "; ".join(
f"{err['code']}: {err['message'] if 'message' in err else 'no message'}"
for err in op["error"]["errors"]
)
log.error(
f"placement group failed to create: '{group}' ({op['name']}): {msg}"
)
log.info(
f"created {len(operations)} placement groups ({to_hostlist_fast(operations.keys())})"
)
return groups