in redash/tasks/general.py [0:0]
def purge_failed_jobs():
with Connection(rq_redis_connection):
queues = [q for q in Queue.all() if q.name not in default_operational_queues]
for queue in queues:
failed_job_ids = FailedJobRegistry(queue=queue).get_job_ids()
failed_jobs = Job.fetch_many(failed_job_ids, rq_redis_connection)
stale_jobs = []
for failed_job in failed_jobs:
# the job may not actually exist anymore in Redis
if not failed_job:
continue
# the job could have an empty ended_at value in case
# of a worker dying before it can save the ended_at value,
# in which case we also consider them stale
if not failed_job.ended_at:
stale_jobs.append(failed_job)
elif (
datetime.utcnow() - failed_job.ended_at
).total_seconds() > settings.JOB_DEFAULT_FAILURE_TTL:
stale_jobs.append(failed_job)
for stale_job in stale_jobs:
stale_job.delete()
if stale_jobs:
logger.info(
"Purged %d old failed jobs from the %s queue.",
len(stale_jobs),
queue.name,
)