in integration/preemption.py [0:0]
def preemption():
job = pascal_job("learnfair", timeout_min=2 * 60)
log.info(f"Scheduled {job}, {job.paths.stdout}")
# log.info(job.paths.submission_file.read_text())
wait_job_is_running(job)
node = job.get_info()["NodeList"]
log.info(f"{job} ({job.state}) is runnning on {node} !")
# Schedule another pascal job on the same node, whith high priority
priority_job = pascal_job("dev", timeout_min=15, node=node)
log.info(f"Schedule {priority_job} ({job.state}) on {node} with high priority.")
wait_job_is_running(priority_job)
# if priority_job is running, then job should have been preempted
learfair_stderr = job.stderr()
assert learfair_stderr is not None, job.paths.stderr
log.info(
f"Job {priority_job} ({priority_job.state}) started, "
f"job {job} ({job.state}) should have been preempted: {learfair_stderr}"
)
interruptions = [l for l in learfair_stderr.splitlines() if "Interrupted" in l]
assert len(interruptions) == 1, interruptions
assert job.state in ("PENDING"), job.state
interrupted_ts = interruptions[0].split("!!! Interrupted on: ")[-1]
interrupted = datetime.fromisoformat(interrupted_ts)
priority_job.result()
print("Preemption test succeeded ✅")