in libs/libcommon/src/libcommon/queue/jobs.py [0:0]
def get_zombies(self, max_seconds_without_heartbeat: float) -> list[JobInfo]:
"""Get the zombie jobs.
It returns jobs without recent heartbeats, which means they crashed at one point and became zombies.
Usually `max_seconds_without_heartbeat` is a factor of the time between two heartbeats.
Returns:
`list[JobInfo]`: an array of the zombie job infos.
"""
started_jobs = JobDocument.objects(status=Status.STARTED)
if max_seconds_without_heartbeat <= 0:
return []
zombies = [
job
for job in started_jobs
if (
job.last_heartbeat is not None
and get_datetime()
>= pytz.UTC.localize(job.last_heartbeat) + timedelta(seconds=max_seconds_without_heartbeat)
)
or (
job.last_heartbeat is None
and job.started_at is not None
and get_datetime()
>= pytz.UTC.localize(job.started_at) + timedelta(seconds=max_seconds_without_heartbeat)
)
]
return [zombie.info() for zombie in zombies]