swelancer.py (364 lines of code) (raw):
import asyncio
import json
import ast
import re
import os
from typing import Any, Literal, Sequence, assert_never, get_args
from typing_extensions import TypedDict
from uuid import uuid4
import structlog.stdlib
from openai.types.chat import ChatCompletionMessageParam
from typing_extensions import override
from nanoeval.eval import RetryableSystemError
import chz
from nanoeval.asyncio_utils import generator_with_cleanup
from nanoeval.solvers.computer_tasks.code_execution_interface import (
ComputerInterface,
JupyterComputerInterface,
)
from nanoeval.solvers.computer_tasks.solver import PythonCodingEval, strip_all_metadata
from nanoeval.solvers.computer_tasks.steps import (
FinalResult,
FinalResultSuccessful,
FinalResultWithException,
)
from nanoeval.solvers.computer_tasks.task import ComputerTask, Grade
import asyncio
import structlog.stdlib
import random
import string
import chz
from nanoeval.solvers.computer_tasks.code_execution_interface import ComputerInterface
import pandas as pd
from dataclasses import field
from dotenv.main import dotenv_values
logger = structlog.stdlib.get_logger(component=__name__, _print=True)
SWELancerSplit = Literal[
"Diamond-$500k",
]
ALL_SPLITS: set[SWELancerSplit] = set(get_args(SWELancerSplit))
# Constant ENV vars
PUSHER_APP_KEY = "".join(random.choices(string.ascii_letters + string.digits, k=20))
PUSHER_APP_SECRET = "".join(random.choices(string.ascii_letters + string.digits, k=20))
PUSHER_APP_ID = "".join(random.choices(string.digits, k=7))
class SwelancerInstance(TypedDict):
repo: str
def tests_failed_in_pytest_log(pytest_log: str) -> bool:
lines = pytest_log.split("\n")
summary_pattern = re.compile(r"=+\s+(\d+)\s+failed.*in\s+[\d\.]+s")
return any(summary_pattern.search(line) for line in lines)
class SWELancerGrade(Grade):
patch_path: str
grade_report_path: str | None
class SWELancerTask(ComputerTask):
instance: SwelancerInstance | None
price: float
variant: str
acceptable_folders: list[str] = []
manager_data: dict[str, Any] | None = None
manager_commit: str | None = None
retry_message: str = "The task isn't complete yet! You must write your output to this path exactly: '/app/expensify/manager_decisions.json'. You cannot submit to /mnt/data!"
# Install jupyter in the correct conda environment and start it within that environment.
jupyter_setup: Sequence[str] | None = (
"bash",
"-c",
"conda run -n testbed pip install jupyter && conda run -n testbed jupyter kernel --ip 0.0.0.0",
)
cwd: str = "/app/expensify"
@override
async def _setup(self, computer: ComputerInterface) -> None:
# Assert various things about the environment
print("SETUP")
if isinstance(computer, JupyterComputerInterface):
await computer.check_execute(
"""import os; assert os.environ.get('CONDA_DEFAULT_ENV') == 'testbed', os.environ.get('CONDA_DEFAULT_ENV')""",
)
logger.info("Correct Jupyter environment is loaded.")
if self.variant == "swe_manager":
await computer.check_shell_command(f"bash -c 'git reset --hard {self.manager_commit}^'")
else:
print("Waiting for setup to be done")
while True:
res = await computer.send_shell_command("cat /setup_done.txt")
if res.output.decode("utf-8").strip() == "done":
print("Setup is done")
break
await asyncio.sleep(15)
# Zip the tests directory with a password
print('Zipping tests')
await computer.check_shell_command(
"apt-get install -y zip unzip"
)
await computer.check_shell_command(
"cd /app/tests && zip -P 'secretpasswordhere' -r /app/tmp.zip . && mv /app/tmp.zip /app/tests.zip"
)
# Clear tests directory
await computer.send_shell_command("rm -rf /app/tests")
# Purge existing git repo and create a new one
await computer.check_shell_command("rm -rf .git")
await computer.check_shell_command("git init")
await computer.send_shell_command("find / -type f -name '*.orig' -delete")
# Create a temp commit to mark where the model's changes begin
if self.variant == "ic_swe":
await computer.check_shell_command("bash -c 'git add .'")
await computer.check_shell_command(
'bash -c \'git -c user.name="temp" -c user.email="temp@example.com" commit -m "temp"\''
)
print("Temp commit created")
@override
async def check_for_valid_submission(self, computer: ComputerInterface) -> bool:
if self.variant == "ic_swe":
res = await computer.check_shell_command("cd /app/expensify && git diff HEAD")
output = res.output.decode("utf-8").strip()
return len(output) > 0
else:
res = await computer.send_shell_command("cat /app/expensify/manager_decisions.json")
output = res.output.decode("utf-8").strip()
return len(output) > 0 and "No such file or directory" not in output
@override
async def grade(self, computer: ComputerInterface) -> SWELancerGrade:
#Unzip locked testing files
print('Unzipping tests')
print(await computer.check_shell_command(
"unzip -q -P 'secretpasswordhere' -o /app/tests.zip -d /app/tests"
))
if self.variant == "swe_manager":
# === Grade Manager Tasks ===
manager_decisions = await computer.send_shell_command(
"""bash -c 'cat /app/expensify/manager_decisions.json'"""
)
manager_decisions_json = json.loads(manager_decisions.output.decode("utf-8"))
print(manager_decisions_json)
correct = 0.0
correct_option = int(self.manager_data["game"]["correct_proposal"]["id"]) #type: ignore
submitted_option = int(list(manager_decisions_json.values())[0])
if submitted_option == correct_option:
correct = 1.0
else:
correct = 0.0
return SWELancerGrade(
score=correct,
grader_log=json.dumps(
{
"earned": self.price * correct,
"available": self.price,
"variant": self.variant,
}
),
patch_path="",
grade_report_path="",
)
# === Grade SWE Tasks ===
sample_id = f"{self.question_id}.{self.attempt_id}.{self.retry_idx}-{str(uuid4())}"
logger.info("Grading task...", instance_id=sample_id)
try:
await computer.check_shell_command(
"git -c core.fileMode=false diff --binary HEAD > /tmp/patch.diff"
)
model_patch = await computer.download("/tmp/patch.diff")
finally:
pass
grading_computer = computer
# First, we need to identify all applicable test folders.
list_command = "bash -i -c 'ls /app/tests/issues'"
list_res = await grading_computer.send_shell_command(list_command)
folders = list_res.output.decode("utf-8").splitlines()
filtered_folders = [
folder for folder in folders if not folder.startswith("Reintroduce-")
]
# Get the raw GitHub issue ID from the Question ID
issue_id = self.question_id.replace("Reintroduce-", "").split("_")[0]
matching_folders = [
folder for folder in filtered_folders if folder.split("_")[0] == issue_id
]
print(f"Running n={len(matching_folders)} tests for {issue_id}.")
test_results = []
pytest_outputs = []
num_failed = 0
for env_issue_id in matching_folders:
try:
# Set the ISSUE_ID environment variable
set_env_command = f"""echo "export ISSUE_ID={env_issue_id}" >> ~/.bashrc"""
await grading_computer.send_shell_command(f"bash -i -c '{set_env_command}'")
await grading_computer.send_shell_command(
f"""mkdir -p /app/tests/logs/{env_issue_id}"""
)
res = await grading_computer.send_shell_command(
f"""bash -i -c 'export ISSUE_ID={env_issue_id}; ansible-playbook -i "localhost," --connection=local /app/tests/run_fixed_state.yml'"""
)
print(f"Ansible playbook output: {res.output.decode('utf-8')}")
pytest_log = await grading_computer.send_shell_command(
f"""bash -i -c 'export ISSUE_ID={env_issue_id}; cat /app/tests/logs/$ISSUE_ID/pytest.log'"""
)
pytest_output = pytest_log.output.decode("utf-8")
pytest_outputs.append(pytest_output)
print(f"Pytest output: {pytest_output}")
# Entire test broke!
if "No such file or directory" in pytest_output:
num_failed += 1
continue
if tests_failed_in_pytest_log(pytest_output):
test_results.append(0.0)
else:
# End early if it's wrong
test_results.append(1.0)
break
except Exception as e:
print("Grading error... retrying!")
print(str(e))
if not test_results:
raise RetryableSystemError("No tests were executed.")
print(f"Test results: {test_results}")
correct = 1.0 if all(result == 1.0 for result in test_results) else 0.0
return SWELancerGrade(
score=correct,
grader_log=json.dumps(
{
"earned": self.price * correct,
"available": self.price,
"pytest_logs": pytest_outputs,
"variant": self.variant,
"model_patch": model_patch.decode("utf-8"),
# You can compute this using percentage of tests passed, how far along the e2e test got, etc.
"partial_credit": correct
}
),
patch_path="",
grade_report_path="",
)
@chz.chz
class SWELancerEval(PythonCodingEval):
split: SWELancerSplit = "Diamond-$500k"
taskset: list[str] | None = None
@override
def get_name(self) -> str:
return "SWELancer"
@override
async def get_tasks(self) -> list[SWELancerTask]:
tasks = pd.read_csv("swelancer_tasks.csv")
env_vars = dotenv_values(".env")
SWEFL_ENV = {
"PUSHER_APP_KEY": PUSHER_APP_KEY,
"PUSHER_APP_SECRET": PUSHER_APP_SECRET,
"PUSHER_APP_ID": PUSHER_APP_ID,
"USE_WEB_PROXY": env_vars["USE_WEB_PROXY"],
"EXPENSIFY_URL": env_vars["EXPENSIFY_URL"],
"NEW_EXPENSIFY_URL": env_vars["NEW_EXPENSIFY_URL"],
"ISSUE_ID": "0",
"LC_ALL": "C.UTF-8",
"EVAL_VARIANT": "ic_swe",
}
docker_image = "swelancer:latest"
swelancer_tasks = []
i = 0
for task in tasks.to_dict(orient="records"):
if self.taskset and task["question_id"] not in self.taskset:
continue
# task['all_proposals'] = ast.literal_eval(task['all_proposals'])
task['prompt'] = ast.literal_eval(task['prompt'])
task['acceptable_folders'] = ast.literal_eval(task['acceptable_folders'])
if str(task['manager_data']) == 'nan':
task['manager_data'] = None
else:
task['manager_data'] = ast.literal_eval(task['manager_data'])
if str(task['manager_commit']) == 'nan':
task['manager_commit'] = None
if "Reintroduce-" in task["question_id"]:
continue
SWEFL_ENV["ISSUE_ID"] = task["question_id"]
del task['price_limit']
del task['canary']
swelancer_tasks.append(SWELancerTask(**task, attempt_id=str(i), environment=SWEFL_ENV, grade_every_step=False, docker_image=docker_image, instance=SwelancerInstance(repo="expensify"))) # type: ignore
i += 1
return swelancer_tasks
@override
async def evaluate(self, task: ComputerTask) -> FinalResult:
last_convo = None
# Create logs directory if it doesn't exist
os.makedirs("conversation_logs", exist_ok=True)
# Create unique log file name using task and attempt IDs
log_filename = f"conversation_logs/{task.question_id}_{task.attempt_id}_{task.retry_idx}.jsonl"
async with generator_with_cleanup(self.solver.run(task)) as gen:
async for step in gen:
if step.convo:
last_convo = step.convo
# Log each conversation step
print(f"Logging conversation step to {log_filename}")
with open(log_filename, 'a') as f:
log_entry = {
'timestamp': str(pd.Timestamp.now()),
'question_id': task.question_id,
'attempt_id': task.attempt_id,
'retry_idx': task.retry_idx,
'conversation': strip_all_metadata(step.convo)
}
f.write(json.dumps(log_entry) + '\n')
if isinstance(step, (FinalResultSuccessful, FinalResultWithException)):
if last_convo:
prompt_messages: Any = strip_all_metadata(last_convo)
else:
prompt_messages = ""
if isinstance(step, FinalResultSuccessful):
sampled = f"""{step.grade.grader_log}
Finish reasons: {step.finish_status=} {step.max_steps_reached=} {step.max_tokens_reached=} {step.max_time_reached=}
"""
elif isinstance(step, FinalResultWithException):
sampled = f"\n\n{step.exception}\n\n{step.traceback}"
else:
assert_never(step)
await asyncio.sleep(10) # time to shut down the docker container
return step
raise ValueError("Solver did not return a final result! This is a bug.")
@override
async def get_summary(self, results: list[tuple[ComputerTask, FinalResult]]) -> dict[str, Any]:
"""
Generate a summary of all evaluation results, including a breakdown of earnings and correctness per subtask.
"""
logger.info("Generating summary of results.")
try:
# Get the original summary from the superclass method
original_summary = await super().get_summary(results)
original_summary["total_earnings"] = 0.0
original_summary["total_available"] = 0.0
# Initialize metadata variants
original_summary["metadata_variants"] = {
"accuracy": {
"ic_swe": 0.0,
"swe_manager": 0.0,
"ic_swe_hard": 0.0,
"swe_manager_hard": 0.0,
},
"earnings": {
"ic_swe": 0.0,
"swe_manager": 0.0,
"ic_swe_hard": 0.0,
"swe_manager_hard": 0.0,
},
"available": {
"ic_swe": 0.0,
"swe_manager": 0.0,
"ic_swe_hard": 0.0,
"swe_manager_hard": 0.0,
},
}
# Counters for accuracy calculation
variant_counts = {
"ic_swe": {"correct": 0, "total": 0},
"swe_manager": {"correct": 0, "total": 0},
"ic_swe_hard": {"correct": 0, "total": 0},
"swe_manager_hard": {"correct": 0, "total": 0},
}
partial_credit_sum = 0.0
partial_credit_count = 0
for task, result in results:
try:
grader_log = json.loads(result.grade.grader_log)
earned = grader_log.get("earned", 0.0)
available = grader_log.get("available", 0.0)
variant = grader_log.get("variant", "")
assert variant in ["ic_swe", "swe_manager"], f"Unknown variant: {variant}"
# Update total earnings and available
original_summary["total_earnings"] += earned
original_summary["total_available"] += available
# Update variant-specific earnings and available
if variant in original_summary["metadata_variants"]["earnings"]:
original_summary["metadata_variants"]["earnings"][variant] += earned
original_summary["metadata_variants"]["available"][variant] += available
# Update accuracy counters
variant_counts[variant]["total"] += 1
if earned > 0:
variant_counts[variant]["correct"] += 1
# Check for hard tasks and update accordingly
if task.price > 300: # type: ignore
hard_variant = f"{variant}_hard"
if hard_variant in original_summary["metadata_variants"]["accuracy"]:
# Update earnings and available for hard variants
original_summary["metadata_variants"]["earnings"][hard_variant] += earned
original_summary["metadata_variants"]["available"][hard_variant] += available
# Update accuracy counters for hard variants
variant_counts[hard_variant]["total"] += 1
if earned > 0:
variant_counts[hard_variant]["correct"] += 1
original_summary["pytest_log"] = grader_log.get("pytest_log", "No logs found")
# Compute partial credit for each task. Use provided partial_credit if available,
# otherwise derive it: if earned equals available (and available > 0), then full credit, else 0.
pc = grader_log.get("partial_credit")
if pc is None:
pc = 1.0 if (available > 0 and earned == available) else 0.0
partial_credit_sum += pc
partial_credit_count += 1
except Exception as e:
print(str(e))
# Calculate accuracy for each variant
for variant in ["ic_swe", "swe_manager", "ic_swe_hard", "swe_manager_hard"]:
correct = variant_counts[variant]["correct"]
total = variant_counts[variant]["total"]
if total > 0:
original_summary["metadata_variants"]["accuracy"][variant] = correct / total
original_summary["average_partial_credit"] = partial_credit_sum / partial_credit_count if partial_credit_count else 0.0
return original_summary
except Exception as e:
logger.exception("Failed to generate summary.")
raise e