in community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/slurmsync.py [0:0]
def sync_placement_groups():
"""Delete placement policies that are for jobs that have completed/terminated"""
keep_states = frozenset(
[
"RUNNING",
"CONFIGURING",
"STOPPED",
"SUSPENDED",
"COMPLETING",
"PENDING",
]
)
keep_jobs = {
str(job.id)
for job in lookup().get_jobs()
if job.job_state in keep_states
}
keep_jobs.add("0") # Job 0 is a placeholder for static node placement
fields = "items.regions.resourcePolicies,nextPageToken"
flt = f"name={lookup().cfg.slurm_cluster_name}-*"
act = lookup().compute.resourcePolicies()
op = act.aggregatedList(project=lookup().project, fields=fields, filter=flt)
placement_groups = {}
pg_regex = re.compile(
rf"{lookup().cfg.slurm_cluster_name}-slurmgcp-managed-(?P<partition>[^\s\-]+)-(?P<job_id>\d+)-(?P<index>\d+)"
)
while op is not None:
result = ensure_execute(op)
# merge placement group info from API and job_id,partition,index parsed from the name
pgs = (
{**pg, **pg_regex.match(pg["name"]).groupdict()} # type: ignore
for pg in chain.from_iterable(
item["resourcePolicies"]
for item in result.get("items", {}).values()
if item
)
if pg_regex.match(pg["name"]) is not None
)
placement_groups.update(
{pg["name"]: pg for pg in pgs if pg.get("job_id") not in keep_jobs}
)
op = act.aggregatedList_next(op, result)
if len(placement_groups) > 0:
delete_placement_groups(list(placement_groups.values()))