def purge_failed_jobs()

in redash/tasks/general.py [0:0]


def purge_failed_jobs():
    with Connection(rq_redis_connection):
        queues = [q for q in Queue.all() if q.name not in default_operational_queues]
        for queue in queues:
            failed_job_ids = FailedJobRegistry(queue=queue).get_job_ids()
            failed_jobs = Job.fetch_many(failed_job_ids, rq_redis_connection)
            stale_jobs = []
            for failed_job in failed_jobs:
                # the job may not actually exist anymore in Redis
                if not failed_job:
                    continue
                # the job could have an empty ended_at value in case
                # of a worker dying before it can save the ended_at value,
                # in which case we also consider them stale
                if not failed_job.ended_at:
                    stale_jobs.append(failed_job)
                elif (
                    datetime.utcnow() - failed_job.ended_at
                ).total_seconds() > settings.JOB_DEFAULT_FAILURE_TTL:
                    stale_jobs.append(failed_job)

            for stale_job in stale_jobs:
                stale_job.delete()

            if stale_jobs:
                logger.info(
                    "Purged %d old failed jobs from the %s queue.",
                    len(stale_jobs),
                    queue.name,
                )