in project/nanoeval/nanoeval/_executor_worker.py [0:0]
def _maybe_pull_task_from_queue() -> tuple[EvalSpec, Task, RecorderProtocol] | None:
# Pull tasks from the monitor queue
with db.conn() as conn:
# Enforce global concurrency limit
num_running = conn.execute(
"""
SELECT COUNT(*) FROM task WHERE executor_pid IS NOT NULL and result IS NULL;
"""
).fetchone()[0]
try:
max_concurrency = int(
conn.execute(
"""
select value from metadata where key = 'max_concurrency';
"""
).fetchone()[0]
)
except TypeError:
logger.exception("Failed to retrieve max_concurrency from metadata")
return None
continue_ok = num_running < max_concurrency
if not continue_ok:
logger.info(
"Max concurrency reached, sleeping. num_running=%s >= max concurrency=%s",
num_running,
max_concurrency,
)
return None
conn.execute("BEGIN EXCLUSIVE;") # Start the transaction
try:
# Step 3: Select the task
cursor = conn.execute(