def __call__()

in covid19_spread/lib/slurm_pool_executor.py [0:0]


    def __call__(self):
        self.worker_finished = False
        worker_job_id = f"worker_{self.worker_id}"
        running_status = (
            len(JobStatus) + self.worker_id + 1
        )  # mark in progress with this code
        transaction_manager = TransactionManager(self.db_pth)
        while not self.worker_finished:
            if self.sleep > 0:
                print(f"Sleeping for {self.sleep} seconds...")
                time.sleep(self.sleep)
            print(f"Worker {self.worker_id} getting job to run")

            def txn(conn):
                ready = self.fetch_ready_job(conn)
                status = JobStatus.pending
                if len(ready) == 0:  # no jobs ready
                    if self.finished(conn):
                        self.worker_finished = True
                        return None  # all jobs are finished, exiting...

                    if self.count_running(conn) > 0:
                        self.sleep = min(max(self.sleep * 2, 1), 30)
                        return None

                    ready = self.get_final_jobs(conn)
                    status = JobStatus.final
                    if len(ready) == 0:
                        self.sleep = min(max(self.sleep * 2, 1), 30)
                        return None
                    print(
                        f"Worker {self.worker_id} is executing final_job: {ready[0][0]}"
                    )

                pickle, job_id, retry_count = ready[0][0], ready[0][1], ready[0][2]
                # Mark that we're working on this job.
                conn.execute(
                    f"""
                    UPDATE jobs SET status={running_status}, worker_id='{worker_job_id}'
                    WHERE pickle='{pickle}' AND status={status} AND id='{self.db_pth}'
                    """
                )
                return pickle, job_id, retry_count

            res = transaction_manager.run(txn)
            if res is None:
                continue
            self.current_job = res
            self.sleep = 0
            pickle, job_id, retry_count = res
            print(f"Worker {self.worker_id} got job to run: {pickle}")

            # Run the job
            job_dir = os.path.dirname(pickle)
            paths = utils.JobPaths(job_dir, job_id=job_id)
            with paths.stderr.open("w", buffering=1) as stderr, paths.stdout.open(
                "w", buffering=1
            ) as stdout:
                with redirect_stderr(stderr), redirect_stdout(stdout):
                    try:
                        with env_var({"SLURM_PICKLE_PTH": str(pickle)}):
                            dl = utils.DelayedSubmission.load(pickle)
                            dl.result()
                            status = JobStatus.success
                    except Exception:
                        retry_count -= 1
                        print(f"Job failed, retry_count = {retry_count}")
                        status = (
                            JobStatus.failure if retry_count == 0 else JobStatus.pending
                        )
                        traceback.print_exc(file=sys.stderr)

                print(f"Worker {self.worker_id} finished job with status {status}")
                transaction_manager.run(
                    lambda conn: conn.execute(
                        f"UPDATE jobs SET status={status.value}, retry_count={retry_count} WHERE pickle='{pickle}' AND id='{self.db_pth}'"
                    )
                )
                self.current_job = None
                print(f"Worker {self.worker_id} updated job status")