def _start_newest_job_and_delete_others()

in libs/libcommon/src/libcommon/queue/jobs.py [0:0]


    def _start_newest_job_and_delete_others(self, job: JobDocument) -> JobDocument:
        """Start a job (the newest one for unicity_id) and delete the other ones.

        A lock is used to ensure that the job is not started by another worker.

        Args:
            job (`JobDocument`): the job to start

        Raises:
            [`AlreadyStartedJobError`]: if a started job already exist for the same unicity_id.
            [`LockTimeoutError`]: if the lock could not be acquired after 20 retries.

        Returns:
            `JobDocument`: the started job
        """
        # could be a method of Job
        RETRIES = 20
        # uuid is used to differentiate between workers
        # otherwise another worker might acquire the lock
        lock_owner = str(uuid4())
        try:
            # retry for 2 seconds
            with lock(
                key=job.unicity_id,
                owner=lock_owner,
                sleeps=[0.1] * RETRIES,
                ttl=lock.TTL.LOCK_TTL_SECONDS_TO_START_JOB,
            ):
                # get all the pending jobs for the same unicity_id
                waiting_jobs = JobDocument.objects(unicity_id=job.unicity_id).order_by("-created_at")
                datetime = get_datetime()
                # raise if any job has already been started for unicity_id
                num_started_jobs = waiting_jobs(status=Status.STARTED).count()
                if num_started_jobs > 0:
                    if num_started_jobs > 1:
                        logging.critical(f"job {job.unicity_id} has been started {num_started_jobs} times. Max is 1.")
                    raise AlreadyStartedJobError(f"job {job.unicity_id} has been started by another worker")
                # get the most recent one
                first_job = waiting_jobs.first()
                if not first_job:
                    raise NoWaitingJobError(f"no waiting job could be found for {job.unicity_id}")
                # start it
                if not JobDocument.objects(pk=str(first_job.pk), status=Status.WAITING).update(
                    started_at=datetime,
                    status=Status.STARTED,
                    write_concern={"w": "majority", "fsync": True},
                    read_concern={"level": "majority"},
                ):
                    raise AlreadyStartedJobError(f"job {job.unicity_id} has been started by another worker")
                update_metrics_for_type(
                    dataset=first_job.dataset,
                    job_type=first_job.type,
                    previous_status=Status.WAITING,
                    new_status=Status.STARTED,
                    difficulty=first_job.difficulty,
                )
                # and delete the other waiting jobs, if any
                self.delete_waiting_jobs_by_job_id(job_ids=[job.pk for job in waiting_jobs if job.pk != first_job.pk])
                return first_job.reload()
        except TimeoutError as err:
            raise LockTimeoutError(
                f"could not acquire the lock for job {job.unicity_id} after {RETRIES} retries."
            ) from err