project/paperbench/paperbench/nano/utils.py (146 lines of code) (raw):
import logging
from datetime import datetime, timedelta
from pathlib import Path
import blobfile as bf
import structlog
from alcatraz.clusters.local import LocalConfig
from paperbench.constants import AGENT_DIR, SUBMISSION_DIR, WORKSPACE_BASE
from paperbench.metrics import EvaluationRun, PaperEvaluation
from paperbench.utils import (
get_default_runs_dir,
get_experiments_dir,
get_paperbench_data_dir,
is_docker_running,
)
def get_split_to_expected_papers() -> dict[str, int]:
"""
Reads the split files in experiments/splits and returns a dictionary
mapping split names to the number of papers in each split.
"""
split_to_expected_papers = {}
splits_dir = get_experiments_dir() / "splits"
for split_file in splits_dir.glob("*.txt"):
split_name = split_file.stem
with open(split_file, "r") as f:
# Count non-empty lines in the file
papers = [line.strip() for line in f if line.strip()]
split_to_expected_papers[split_name] = len(papers)
return split_to_expected_papers
SPLIT_TO_EXPECTED_PAPERS = get_split_to_expected_papers()
def gather_eval_runs(results: list["PaperBenchResult"], n_runs: int) -> list[EvaluationRun]:
"""
Gathers succesfully graded results of nano/eval into a list of n_runs EvaluationRuns
where a single EvaluationRun does not contain more than one evaluation of the same paper.
"""
seed_to_eval_run = {
seed: EvaluationRun(seed=seed, paper_evaluations={}) for seed in range(n_runs)
}
paper_to_cur_seed = {}
if not results:
return list(seed_to_eval_run.values())
for result in results:
if result.judge_output is None or result.judge_output.graded_task_tree is None:
continue
paper_id = result.paper_id
if paper_id not in paper_to_cur_seed:
paper_to_cur_seed[paper_id] = 0
seed = paper_to_cur_seed[paper_id]
paper_to_cur_seed[paper_id] += 1
paper_eval = PaperEvaluation(
paper_id=paper_id,
graded_task_node=result.judge_output.graded_task_tree,
paper_run_id=result.run_id,
)
seed_to_eval_run[seed].paper_evaluations[paper_id] = paper_eval
return list(seed_to_eval_run.values())
def uses_local_config(paperbench: "PaperBench") -> bool: # type: ignore
"""
Check if any of paperbench.solver.cluster_config, paperbench.reproduction.cluster_config,
or paperbench.judge.cluster_config is an instance of LocalConfig.
Args:
paperbench: A PaperBench PythonCodingEval instance
Returns:
bool: True if any of the cluster configs is a LocalConfig, False otherwise
"""
# PythonCodingSolver may not have a cluster_config, just ExternalPythonCodingSolver does for now
if hasattr(paperbench.solver, "cluster_config"):
if isinstance(paperbench.solver.cluster_config, LocalConfig):
return True
# Check reproduction's cluster_config
if isinstance(paperbench.reproduction.cluster_config, LocalConfig):
return True
# Check judge's cluster_config
if isinstance(paperbench.judge.cluster_config, LocalConfig):
return True
return False
def build_agent_command(agent: "Agent") -> str: # type: ignore
"""Builds the command to run the agent."""
cmd = ["bash", f"{AGENT_DIR}/start.sh"]
if agent.kwargs_type == "argparse":
for key, value in agent.kwargs.items():
cmd += [f"--{key}", str(value)]
if agent.kwargs_type == "omegaconf":
cmd += [f"{key}={value}" for key, value in agent.kwargs.items()]
return " ".join(cmd)
def build_reproduce_command(task: "PaperBenchTask") -> str: # type: ignore
"""Builds the command to run the reproduction."""
cmd = [
f"python3 {WORKSPACE_BASE}/run_reproduce.py",
f"--submission-path {SUBMISSION_DIR}",
"--out-path /output/reproduction_metadata.json",
]
if task.reproduction.timeout:
cmd.extend(["--timeout", str(task.reproduction.timeout)])
return " ".join(map(str, cmd))
def build_judge_command(judge: "JudgeConfig", task: "PaperBenchTask") -> str: # type: ignore
"""Builds the command to run the judge."""
cmd = [
"/opt/conda/envs/grader/bin/python", # Use the conda env installed in `pb-grader`
f"{WORKSPACE_BASE}/run_judge.py", # Assumes judge script exists here
f"--submission-path {SUBMISSION_DIR}",
f"--paper-id {task.paper_id}",
f"--judge {judge.scaffold}",
"--out-dir /output",
]
if judge.model not in ("dummy", "random"):
cmd.extend(["--model", judge.model])
if judge.reasoning_effort:
cmd.extend(["--reasoning-effort", judge.reasoning_effort])
if judge.code_only:
cmd.append("--code-only")
if judge.max_depth:
cmd.extend(["--max-depth", str(judge.max_depth)])
return " ".join(map(str, cmd))
def get_file_at_duration(
files: list[str], duration_hr: int, logger: logging.Logger
) -> tuple[str, timedelta]:
"""
Given a list of files with timestamped names, return the file closest to `duration_hr`-hours
after the earliest file in the list.
e.g.
```
files = [
"path/to/file/2024-12-07T10-19-52-GMT.tar.gz",
"path/to/file/2024-12-07T10-49-55-GMT.tar.gz",
"path/to/file/2024-12-07T11-19-56-GMT.tar.gz",
"path/to/file/2024-12-07T11-49-56-GMT_step_10.tar.gz",
"path/to/file/2024-12-07T12-19-58-GMT.tar.gz",
]
get_file_at_duration(files, 1)
> "path/to/file/2024-12-07T11-19-56-GMT.tar.gz",
```
"""
# Extract timestamps from filenames
timestamps = []
for file in files:
# Extract timestamp string between last / and .tar.gz
ts_str = file.split("/")[-1].replace(".tar.gz", "")
if "step" in ts_str:
ts_str = ts_str.split("_step_")[0]
# Parse timestamp string into datetime
try:
# Try parsing with timezone
dt = datetime.strptime(ts_str, "%Y-%m-%dT%H-%M-%S-%Z")
except ValueError:
# Fallback to GMT if no timezone specified
dt = datetime.strptime(ts_str, "%Y-%m-%dT%H-%M-%S-GMT")
timestamps.append(dt)
earliest = min(timestamps)
target = earliest + timedelta(hours=duration_hr)
# Find file with timestamp closest to target
closest_file = min(zip(files, timestamps), key=lambda x: abs((x[1] - target).total_seconds()))
logger.info(
f"Closest file to {duration_hr} hours after earliest file ({earliest}) is {closest_file[0]}"
)
retrieved_file = closest_file[0]
retrieved_duration = closest_file[1] - earliest
return retrieved_file, retrieved_duration
def file_processor(logger, method_name, original_event_dict):
event_dict = original_event_dict.copy() # Avoid mutating the original
destinations = event_dict.pop("destinations", [])
run_group_id = event_dict.pop("run_group_id", None)
run_id = event_dict.pop("run_id", None)
runs_dir = event_dict.pop("runs_dir", get_default_runs_dir())
if "run" in destinations and run_group_id and run_id:
dst = bf.join(runs_dir, run_group_id, run_id, "run.log")
with bf.BlobFile(dst, "a") as f:
f.write(str(event_dict) + "\n")
if "group" in destinations and run_group_id:
dst = bf.join(runs_dir, run_group_id, "group.log")
with bf.BlobFile(dst, "a") as f:
f.write(str(event_dict) + "\n")
return original_event_dict
def filter_processor(logger, method_name, event_dict):
destinations = event_dict.pop("destinations", [])
event_dict.pop("run_group_id", None)
event_dict.pop("run_id", None)
event_dict.pop("runs_dir", None)
if "console" not in destinations:
raise structlog.DropEvent()
return event_dict
def run_sanity_checks(paperbench: "PaperBench"):
if uses_local_config(paperbench):
assert is_docker_running(), (
"Docker is not running, but a local config requested."
" Please ensure Docker is running if you wish to use `LocalConfig` for any of the `cluster_config`s."
)
# Check dataset has been pulled from git lfs
papers_dir = get_paperbench_data_dir() / "papers"
papers = [i for i in papers_dir.glob("**/paper.md")]
for paper in papers:
with open(paper, "r") as f:
assert len(f.readlines()) > 5, f"Paper at {paper} should be pulled from git lfs"