in covid19_spread/lib/slurm_pool_executor.py [0:0]
def __call__(self):
self.worker_finished = False
worker_job_id = f"worker_{self.worker_id}"
running_status = (
len(JobStatus) + self.worker_id + 1
) # mark in progress with this code
transaction_manager = TransactionManager(self.db_pth)
while not self.worker_finished:
if self.sleep > 0:
print(f"Sleeping for {self.sleep} seconds...")
time.sleep(self.sleep)
print(f"Worker {self.worker_id} getting job to run")
def txn(conn):
ready = self.fetch_ready_job(conn)
status = JobStatus.pending
if len(ready) == 0: # no jobs ready
if self.finished(conn):
self.worker_finished = True
return None # all jobs are finished, exiting...
if self.count_running(conn) > 0:
self.sleep = min(max(self.sleep * 2, 1), 30)
return None
ready = self.get_final_jobs(conn)
status = JobStatus.final
if len(ready) == 0:
self.sleep = min(max(self.sleep * 2, 1), 30)
return None
print(
f"Worker {self.worker_id} is executing final_job: {ready[0][0]}"
)
pickle, job_id, retry_count = ready[0][0], ready[0][1], ready[0][2]
# Mark that we're working on this job.
conn.execute(
f"""
UPDATE jobs SET status={running_status}, worker_id='{worker_job_id}'
WHERE pickle='{pickle}' AND status={status} AND id='{self.db_pth}'
"""
)
return pickle, job_id, retry_count
res = transaction_manager.run(txn)
if res is None:
continue
self.current_job = res
self.sleep = 0
pickle, job_id, retry_count = res
print(f"Worker {self.worker_id} got job to run: {pickle}")
# Run the job
job_dir = os.path.dirname(pickle)
paths = utils.JobPaths(job_dir, job_id=job_id)
with paths.stderr.open("w", buffering=1) as stderr, paths.stdout.open(
"w", buffering=1
) as stdout:
with redirect_stderr(stderr), redirect_stdout(stdout):
try:
with env_var({"SLURM_PICKLE_PTH": str(pickle)}):
dl = utils.DelayedSubmission.load(pickle)
dl.result()
status = JobStatus.success
except Exception:
retry_count -= 1
print(f"Job failed, retry_count = {retry_count}")
status = (
JobStatus.failure if retry_count == 0 else JobStatus.pending
)
traceback.print_exc(file=sys.stderr)
print(f"Worker {self.worker_id} finished job with status {status}")
transaction_manager.run(
lambda conn: conn.execute(
f"UPDATE jobs SET status={status.value}, retry_count={retry_count} WHERE pickle='{pickle}' AND id='{self.db_pth}'"
)
)
self.current_job = None
print(f"Worker {self.worker_id} updated job status")