in torchx/schedulers/slurm_scheduler.py [0:0]
def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
p = subprocess.run(
["sacct", "--parsable2", "-j", app_id], stdout=subprocess.PIPE, check=True
)
output = p.stdout.decode("utf-8").split("\n")
if len(output) <= 1:
return None
reader = csv.DictReader(output, delimiter="|")
roles = {}
roles_statuses = {}
msg = ""
app_state = AppState.UNKNOWN
for row in reader:
job_id, *parts = row["JobID"].split("+")
if job_id != app_id:
continue
if len(parts) > 0 and "." in parts[0]:
# we only care about the worker not the child jobs
continue
state = row["State"]
msg = state
state_enum = SLURM_STATES.get(state)
assert (
state_enum
), f"failed to translate slurm state {state} to torchx state"
app_state = state_enum
name_parts = row["JobName"].split("-")
if len(name_parts) < 3:
# name should always have at least 3 parts but sometimes sacct
# is slow to update
continue
role = name_parts[-2]
replica_id = int(name_parts[-1])
if role not in roles:
roles[role] = Role(name=role, num_replicas=0, image="")
roles_statuses[role] = RoleStatus(role, [])
roles[role].num_replicas += 1
roles_statuses[role].replicas.append(
ReplicaStatus(id=replica_id, role=role, state=app_state, hostname=""),
)
return DescribeAppResponse(
app_id=app_id,
roles=list(roles.values()),
roles_statuses=list(roles_statuses.values()),
state=app_state,
msg=msg,
)