in libs/libcommon/src/libcommon/queue/jobs.py [0:0]
def _start_newest_job_and_delete_others(self, job: JobDocument) -> JobDocument:
"""Start a job (the newest one for unicity_id) and delete the other ones.
A lock is used to ensure that the job is not started by another worker.
Args:
job (`JobDocument`): the job to start
Raises:
[`AlreadyStartedJobError`]: if a started job already exist for the same unicity_id.
[`LockTimeoutError`]: if the lock could not be acquired after 20 retries.
Returns:
`JobDocument`: the started job
"""
# could be a method of Job
RETRIES = 20
# uuid is used to differentiate between workers
# otherwise another worker might acquire the lock
lock_owner = str(uuid4())
try:
# retry for 2 seconds
with lock(
key=job.unicity_id,
owner=lock_owner,
sleeps=[0.1] * RETRIES,
ttl=lock.TTL.LOCK_TTL_SECONDS_TO_START_JOB,
):
# get all the pending jobs for the same unicity_id
waiting_jobs = JobDocument.objects(unicity_id=job.unicity_id).order_by("-created_at")
datetime = get_datetime()
# raise if any job has already been started for unicity_id
num_started_jobs = waiting_jobs(status=Status.STARTED).count()
if num_started_jobs > 0:
if num_started_jobs > 1:
logging.critical(f"job {job.unicity_id} has been started {num_started_jobs} times. Max is 1.")
raise AlreadyStartedJobError(f"job {job.unicity_id} has been started by another worker")
# get the most recent one
first_job = waiting_jobs.first()
if not first_job:
raise NoWaitingJobError(f"no waiting job could be found for {job.unicity_id}")
# start it
if not JobDocument.objects(pk=str(first_job.pk), status=Status.WAITING).update(
started_at=datetime,
status=Status.STARTED,
write_concern={"w": "majority", "fsync": True},
read_concern={"level": "majority"},
):
raise AlreadyStartedJobError(f"job {job.unicity_id} has been started by another worker")
update_metrics_for_type(
dataset=first_job.dataset,
job_type=first_job.type,
previous_status=Status.WAITING,
new_status=Status.STARTED,
difficulty=first_job.difficulty,
)
# and delete the other waiting jobs, if any
self.delete_waiting_jobs_by_job_id(job_ids=[job.pk for job in waiting_jobs if job.pk != first_job.pk])
return first_job.reload()
except TimeoutError as err:
raise LockTimeoutError(
f"could not acquire the lock for job {job.unicity_id} after {RETRIES} retries."
) from err