in ansible/roles/slurm/files/scripts/slurmsync.py [0:0]
def sync_placement_groups():
"""Delete placement policies that are for jobs that have completed/terminated"""
keep_states = frozenset(
[
"RUNNING",
"CONFIGURING",
"STOPPED",
"SUSPENDED",
"COMPLETING",
]
)
if lkp.instance_role_safe != "controller":
return
keep_jobs = {
str(job["job_id"])
for job in json.loads(run(f"{lkp.scontrol} show jobs --json").stdout)["jobs"]
if "job_state" in job and set(job["job_state"]) & keep_states
}
keep_jobs.add("0") # Job 0 is a placeholder for static node placement
fields = "items.regions.resourcePolicies,nextPageToken"
flt = f"name={lkp.cfg.slurm_cluster_name}-*"
act = compute.resourcePolicies()
op = act.aggregatedList(project=lkp.project, fields=fields, filter=flt)
placement_groups = {}
pg_regex = re.compile(
rf"{lkp.cfg.slurm_cluster_name}-(?P<partition>[^\s\-]+)-(?P<job_id>\d+)-(?P<index>\d+)"
)
while op is not None:
result = ensure_execute(op)
# merge placement group info from API and job_id,partition,index parsed from the name
pgs = (
NSDict({**pg, **pg_regex.match(pg["name"]).groupdict()})
for pg in chain.from_iterable(
item["resourcePolicies"]
for item in result.get("items", {}).values()
if item
)
if pg_regex.match(pg["name"]) is not None
)
placement_groups.update(
{pg["name"]: pg for pg in pgs if pg.get("job_id") not in keep_jobs}
)
op = act.aggregatedList_next(op, result)
if len(placement_groups) > 0:
delete_placement_groups(list(placement_groups.values()))