generate/evaluate.py (508 lines of code) (raw):
import asyncio
from collections import defaultdict
import json
from pathlib import Path
from typing import Dict, List, Optional
import random
from datetime import datetime
import uuid
from datasets import Dataset, load_dataset
from loguru import logger
from tqdm.asyncio import tqdm
import litellm
from dotenv import load_dotenv
import polars as pl
import aiofiles
from litellm.utils import ModelResponse
class IOIEvaluator:
def __init__(self, org_id: str, model_id: str, api_base: Optional[str] = None, subset: Optional[str] = None,
num_generations: int = 50, num_retries: int = 10,
concurrency: int = 10, num_problems: Optional[int] = None,
last_subtask: bool = False, dry_run: bool = False,
override: bool = False, model_postfix: Optional[str] = None,
revision: Optional[str] = None, timeout: Optional[int] = 600,
use_requests: bool = False, max_tokens: Optional[int] = None):
self.org_id = org_id
self.model_id = model_id
self.api_base = api_base
self.subset = subset
self.num_generations = num_generations
self.num_retries = num_retries
self.concurrency = concurrency
self.num_problems = num_problems
self.last_subtask = last_subtask
self.dry_run = dry_run
self.override = override
self.revision = revision
# Create organization and model directories
self.timeout = timeout
self.use_litellm = not use_requests
self.max_tokens = max_tokens
# Tracking totals
self.total_prompt_tokens = 0
self.total_completion_tokens = 0
self.total_cost = 0.0
self.model_postfix = model_postfix
# Semaphore for controlling concurrency
self._semaphore = asyncio.Semaphore(concurrency)
# HTTP session for direct API calls when not using litellm
self._session = None
if self.api_base:
logger.info(f"Using API base: {self.api_base}")
if not self.use_litellm:
logger.info("Using direct asyncio requests instead of LiteLLM")
if dry_run:
logger.warning("Running in dry-run mode - no actual LLM calls will be made")
# Create results directory
self.model_dir = Path("results") / self.get_model_name()
self.model_dir.mkdir(parents=True, exist_ok=True)
# File path for the single JSONL file
self.results_file = self.model_dir / "results.jsonl"
# Lock for file access
self._file_lock = asyncio.Lock()
async def save_result_locally(self, result: Dict, year: int, problem_id: str, subtask: str, solution_number: int):
"""Save a single result to local JSONL storage with locking."""
# Ensure problem_id is included in the result
result['year'] = year
result['problem_id'] = problem_id
result['subtask'] = subtask
result['solution_number'] = solution_number
try:
# Use lock to prevent concurrent writes
async with self._file_lock:
async with aiofiles.open(self.results_file, 'a') as f:
await f.write(json.dumps(result) + '\n')
except Exception as e:
logger.error(f"Failed to save result locally: {str(e)}")
async def load_previous_results(self) -> Optional[pl.DataFrame]:
"""Load previous results from both HuggingFace Hub and local JSONL storage."""
if self.override:
logger.info("Override mode enabled - not loading previous results")
return None
results_dfs = []
# Try loading from Hub
repo_name = f"{self.org_id}/{self.get_model_name()}"
try:
logger.info(f"Attempting to load previous results from HuggingFace Hub: {repo_name}")
dataset = load_dataset(repo_name, split="train")
if dataset is not None:
# Convert to pandas then to polars
df = dataset.to_polars()
# Add a column indicating if the result is local
df = df.with_columns([
pl.lit(False).alias('is_local')
])
results_dfs.append(df)
logger.info(f"Loaded {len(df)} previous results from HuggingFace Hub")
except Exception as e:
logger.info(f"Could not load from HuggingFace Hub: {str(e)}")
# Try loading from local storage
try:
if self.results_file.exists():
results = []
async with self._file_lock:
async with aiofiles.open(self.results_file, 'r') as f:
async for line in f:
try:
result = json.loads(line.strip())
results.append(result)
except Exception as e:
logger.error(f"Failed to parse JSONL line: {str(e)}")
if results:
local_df = pl.DataFrame(results).with_columns([
pl.lit(True).alias('is_local')
])
results_dfs.append(local_df)
logger.info(f"Loaded {len(local_df)} previous results from local storage")
except Exception as e:
logger.error(f"Failed to load from local storage: {str(e)}")
# Combine results if we have any
if results_dfs:
# Select just columns: 'generation', 'code', 'language', 'model_kwargs', 'metadata', 'uuid', 'problem_id', 'subtask', 'solution_number', 'is_local'
common_columns = ['generation', 'code', 'language', 'model_kwargs', 'metadata', 'uuid', 'year', 'problem_id', 'subtask', 'solution_number', 'is_local']
# Add missing 'year' column with None values if needed
results_dfs = [df if 'year' in df.columns else df.with_columns(pl.lit(None).alias('year')) for df in results_dfs]
# Drop that are not in common_columns
results_dfs = [df.select(common_columns) for df in results_dfs]
# Try this instead:
# Add stop_reason to metadata if it doesn't exist
results_dfs = [df.with_columns(pl.when(pl.col('metadata').is_not_null()).then(pl.col('metadata').map_elements(lambda x: {"stop_reason": "unknown"} | x)).otherwise(pl.col('metadata')).alias('metadata')) for df in results_dfs]
# Concatenate the aligned dataframes
combined_df = pl.concat(results_dfs, how="vertical")
# First sort by whether code exists (True first), then by source (local first)
# This ensures we keep entries with code when deduplicating
deduplicated_df = (
combined_df
.with_columns([
# Add a column indicating if code exists and is non-empty
pl.when((pl.col('code').is_not_null()) & (pl.col('code') != ""))
.then(1)
.otherwise(0)
.alias('has_code'),
])
# Sort by has_code (descending) and is_local (descending)
.sort(['has_code', 'is_local'], descending=[True, True])
# Keep first occurrence after sorting (prioritizing entries with code and local source)
.unique(
subset=["year", "problem_id", "subtask", "solution_number"],
keep='first'
)
# Drop the temporary columns
.drop(['has_code', 'is_local'])
)
logger.info(f"Combined and deduplicated results: {len(deduplicated_df)} entries")
return deduplicated_df
return None
def get_dummy_response(self, prompt: str, seed: int) -> Dict:
"""Generate a dummy response for dry runs."""
dummy_code = """```cpp
int main() {
// This is a dummy solution
return 0;
}
```"""
return {
"generation": f"This is a dummy response for testing purposes.\n{dummy_code}",
"code": "int main() {\n // This is a dummy solution\n return 0;\n}",
"language": "cpp",
"model_kwargs": {
"seed": seed,
},
"metadata": {
"usage": {
'completion_tokens': 10,
'prompt_tokens': len(prompt.split()),
'total_tokens': len(prompt.split()) + 10,
'cost': 0.0
},
"timestamp": datetime.now().isoformat(),
"stop_reason": "length" # Add stop reason for dummy response
}
}
def extract_code(self, text: str) -> tuple[str, str]:
"""Extract code from the response between ```cpp and ``` markers."""
try:
parts = text.split("```cpp\n")
if len(parts) > 1:
code_block = parts[-1].split("```")[0]
code = code_block.strip()
if not code:
logger.warning("Empty code block found")
return "", "cpp"
return code, "cpp"
logger.warning("No code block found in the response")
return "", "unknown"
except Exception as e:
logger.error(f"Failed to extract code: {str(e)}")
return "", "unknown"
async def generate_completion(self, prompt: str, seed: int) -> Dict:
"""Generate completion using direct asyncio HTTP requests."""
retry_budget = self.num_retries
while retry_budget > 0:
try:
await asyncio.sleep(random.uniform(0.0, 0.1))
async with self._session.post(
f"{self.api_base}/v1/chat/completions",
json={
"model": "default",
"messages": [{"role": "user", "content": prompt}],
"seed": seed,
"temperature": 0.7,
"top_p": 0.8,
"max_tokens": self.max_tokens,
},
headers={"Authorization": "Bearer EMPTY"},
) as response:
result = await response.json(content_type=None)
if result is None:
logger.error("Received None response from API")
retry_budget -= 1
await asyncio.sleep(5)
continue
# Extract response content
message_content = result.get("choices", [{}])[0].get("message", {}).get("content", "")
# Extract token usage
usage = result.get("usage", {})
completion_tokens = usage.get("completion_tokens", 0)
prompt_tokens = usage.get("prompt_tokens", 0)
total_tokens = usage.get("total_tokens", 0)
# Update totals
self.total_prompt_tokens += prompt_tokens
self.total_completion_tokens += completion_tokens
# Extract code
code, language = self.extract_code(message_content)
response_dict = {
"generation": message_content,
"code": code,
"language": language,
"model_kwargs": {
"seed": seed,
},
"metadata": {
"usage": {
'completion_tokens': completion_tokens,
'prompt_tokens': prompt_tokens,
'total_tokens': total_tokens,
},
"timestamp": datetime.now().isoformat(),
"stop_reason": result.get("choices", [{}])[0].get("finish_reason", "unknown")
}
}
return response_dict
except Exception as e:
logger.exception(f"API error (will retry): {e}")
retry_budget -= 1
await asyncio.sleep(10)
raise Exception("All retries failed for direct API call")
async def call_llm(self, prompt: str, seed: int) -> Dict:
"""Call the LLM using LiteLLM's built-in retry mechanism or direct asyncio requests."""
if self.dry_run:
result = self.get_dummy_response(prompt, seed)
return result
if not self.use_litellm:
return await self.generate_completion(prompt, seed)
return await self.call_litellm(prompt, seed)
async def call_litellm(self, prompt: str, seed: int) -> Dict:
model_name = self.model_id
kwargs = {}
if self.model_id.startswith("sglang/"):
model_name = model_name.replace("sglang/", "custom_openai/")
kwargs["api_base"] = self.api_base
kwargs["api_key"] = "sk-proj-1234567890"
if self.max_tokens is not None:
kwargs["max_tokens"] = self.max_tokens
response: ModelResponse = await litellm.acompletion(
model=model_name,
messages=[{"role": "user", "content": prompt, "cache_control": {"type": "ephemeral"}}],
seed=seed,
num_retries=self.num_retries,
top_p=0.8,
temperature=0.7,
timeout=self.timeout,
**kwargs
)
# Extract stop reason
stop_reason = response.choices[0].finish_reason
# Extract usage information safely
usage = {}
cost = 0.0
if hasattr(response, 'usage'):
try:
completion_tokens = getattr(response.usage, 'completion_tokens', 0)
prompt_tokens = getattr(response.usage, 'prompt_tokens', 0)
total_tokens = getattr(response.usage, 'total_tokens', 0)
# Calculate cost using litellm
try:
cost = litellm.completion_cost(completion_response=response)
except Exception as e:
logger.warning(f"Failed to calculate cost: {str(e)}")
cost = 0.0
usage = {
'completion_tokens': completion_tokens,
'prompt_tokens': prompt_tokens,
'total_tokens': total_tokens,
'cost': cost
}
# Update totals
self.total_prompt_tokens += prompt_tokens
self.total_completion_tokens += completion_tokens
self.total_cost += cost
except Exception as e:
logger.error(f"Failed to extract usage information: {str(e)}")
message_content = response.choices[0].message.content if response.choices else ""
# Extract code from the response
code, language = self.extract_code(message_content or "")
result = {
"generation": message_content,
"code": code,
"language": language,
"model_kwargs": {
"seed": seed,
},
"metadata": {
"usage": usage,
"timestamp": datetime.now().isoformat(),
"stop_reason": stop_reason
}
}
return result
async def create_solution_requests(self, subtasks: List[Dict]) -> List[Dict]:
"""Prepare result entries for a single problem."""
results = []
for subtask in subtasks:
prompt = subtask['problem']
for i in range(self.num_generations):
try:
random_uuid = str(uuid.uuid4())
results.append({
"year": subtask['year'],
"problem_id": subtask['id'],
"subtask": subtask["subtask"],
"prompt": prompt,
"generation": None,
"code": "",
"language": "unknown",
"solution_number": i,
"uuid": random_uuid,
"model_kwargs": {"seed": i},
"metadata": {
"usage": {'completion_tokens': 0, 'prompt_tokens': 0, 'total_tokens': 0, 'cost': 0.0},
"timestamp": datetime.now().isoformat()
}
})
except Exception as e:
logger.error(f"Failed to prepare prompts for problem {subtask['id']}, subtask {subtask['subtask']}: {str(e)}")
return []
return results
async def run_evaluation(self):
"""Run the evaluation for all problems."""
try:
# Create HTTP session if using direct API calls
if not self.use_litellm and not self.dry_run:
import aiohttp
self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout), connector=aiohttp.TCPConnector(limit=self.concurrency, ttl_dns_cache=300, keepalive_timeout=self.timeout))
logger.info(f"Loading IOI dataset for subset: {self.subset}")
dataset = load_dataset("open-r1/ioi", split=self.subset)
problem_subtasks = defaultdict(list)
for problem in dataset:
problem_subtasks[(problem["year"], problem["id"])].append(problem)
problem_ids = list(problem_subtasks.keys())
if self.num_problems is not None:
problem_ids = problem_ids[:self.num_problems]
logger.info(f"Limited evaluation to first {self.num_problems} problems")
logger.info(f"Starting evaluation of {len(problem_ids)} problems...")
# Step 1: Generate all solution requests
all_solution_requests = []
for problem_id in tqdm(problem_ids, desc="Preparing solution requests"):
subtasks = problem_subtasks[problem_id]
if self.last_subtask:
subtasks = [subtasks[-1]]
requests = await self.create_solution_requests(subtasks)
all_solution_requests.extend(requests)
# Convert to Polars DataFrame for efficient operations
requests_df = pl.DataFrame(all_solution_requests)
logger.info(f"Created {len(requests_df)} solution requests")
# Step 2: Load previous results
previous_df = None
if not self.override:
previous_df = await self.load_previous_results()
if previous_df is not None:
logger.info(f"Loaded {len(previous_df)} previous results")
# Step 3: Merge solution requests with previous results efficiently
if previous_df is not None:
# Keep only the columns we want to preserve from previous results
preserve_cols = ['generation', 'code', 'language', 'metadata', 'model_kwargs']
preserve_cols_with_key = preserve_cols + ['year', 'problem_id', 'subtask', 'solution_number']
previous_df = previous_df.select(preserve_cols_with_key).filter(pl.col('generation').is_not_null() & (pl.col('generation') != ""))
# Merge using polars, keeping all solution requests and only matching previous results
merged_df = requests_df.join(
previous_df,
on=('year', 'problem_id', 'subtask', 'solution_number'),
how='left',
suffix='_prev'
)
# Update values from previous results where they exist
for col in preserve_cols:
prev_col = f'{col}_prev'
merged_df = merged_df.with_columns(
pl.when(pl.col(prev_col).is_not_null())
.then(pl.col(prev_col))
.otherwise(pl.col(col))
.alias(col)
)
# Drop the _prev columns
merged_df = merged_df.select([
c for c in merged_df.columns if not c.endswith('_prev')
])
else:
merged_df = requests_df
# Count how many need to be generated
to_generate_df = merged_df.filter(
(pl.col('generation').is_null()) |
(pl.col('generation') == "")
)
# Update seeds ensuring uniqueness
to_generate_dicts = to_generate_df.to_dicts()
logger.info(f"Need to generate {len(to_generate_df)} out of {len(merged_df)} total entries")
if len(to_generate_df) == 0:
logger.info("No generations needed - all results are already available")
return
# Run generations for entries without results
async def process_single(row: Dict) -> Dict:
async with self._semaphore:
try:
llm_result = await self.call_llm(
row["prompt"],
row["model_kwargs"]["seed"]
)
# Log progress and token usage
if llm_result["metadata"].get("usage"):
usage = llm_result["metadata"]["usage"]
logger.info(
f"Problem {row['problem_id']} (Solution {row['solution_number']}) - "
f"Tokens: {usage.get('total_tokens', 0)} "
f"(prompt: {usage.get('prompt_tokens', 0)}, "
f"completion: {usage.get('completion_tokens', 0)}) - "
f"Cost: ${usage.get('cost', 0.0):.4f}"
)
llm_result["uuid"] = row["uuid"]
# Save result immediately
await self.save_result_locally(llm_result, row["year"], row["problem_id"], row["subtask"], row["solution_number"])
return llm_result
except Exception as e:
logger.error(f"Failed generation for problem {row['problem_id']}: {str(e)}")
error_result = {
"generation": "",
"code": "",
"language": "unknown",
"uuid": row["uuid"],
"metadata": {
"error": str(e),
"usage": {'completion_tokens': 0, 'prompt_tokens': 0, 'total_tokens': 0, 'cost': 0.0},
"timestamp": datetime.now().isoformat(),
"stop_reason": "error" # Add stop reason for error case
}
}
return error_result
# Run generations in parallel with controlled concurrency
tasks = [process_single(row) for row in to_generate_dicts]
generated_results = await tqdm.gather(*tasks, desc="Running generations")
# Convert generated results to DataFrame and update original DataFrame
generated_df = pl.DataFrame(generated_results)
# Merge generated results with previous results
merged_df = merged_df.join(
generated_df,
on='uuid',
how='left',
suffix='_gen'
)
# Update the old columns with the new values
for col in ['generation', 'code', 'language', 'metadata', 'model_kwargs']:
merged_df = merged_df.with_columns(
pl.when(pl.col(f'generation_gen').is_not_null() & (pl.col(f'generation_gen') != ""))
.then(pl.col(f'{col}_gen'))
.otherwise(pl.col(col))
.alias(col)
)
# Drop the _gen columns
merged_df = merged_df.select([
c for c in merged_df.columns if not c.endswith('_gen')
])
# Validate results before pushing to hub
valid_results = merged_df.filter(
(pl.col('generation').is_not_null()) &
(pl.col('generation') != "")
)
total_expected = len(merged_df)
total_valid = len(valid_results)
logger.info(f"Valid results: {total_valid}/{total_expected}")
# Only push to hub if all results are valid
if total_valid == total_expected:
# Convert to HF Dataset
output_dataset = Dataset.from_polars(merged_df)
model_name = self.get_model_name()
try:
output_dataset.push_to_hub(f"{self.org_id}/{model_name}")
logger.info(f"Pushed to hub: {self.org_id}/{model_name}")
except Exception as e:
logger.error(f"Failed to push to hub: {str(e)}")
else:
logger.warning(
f"Not pushing to hub - missing {total_expected - total_valid} valid results. "
"Results saved locally and can be retried later."
)
# Log final statistics
# logger.info(f"Evaluation completed. Total successful generations: {successful}/{len(all_results)}")
logger.info(
f"Total tokens used: {self.total_prompt_tokens + self.total_completion_tokens} "
f"(prompt: {self.total_prompt_tokens}, completion: {self.total_completion_tokens})"
)
logger.info(f"Total cost: ${self.total_cost:.4f}")
# Clean up HTTP session if using direct API calls
if self._session is not None:
await self._session.close()
self._session = None
return merged_df
except Exception as e:
# Clean up HTTP session if using direct API calls
if self._session is not None:
await self._session.close()
self._session = None
raise e
def get_model_name(self):
model_name = f"ioi-eval-{self.model_id.replace('/', '_')}"
if self.dry_run:
model_name = f"dummy-{model_name}"
if self.revision:
model_name = f"{model_name}-{self.revision.replace('/', '_')}"
if self.model_postfix:
model_name = f"{model_name}-{self.model_postfix}"
return model_name
def main():
load_dotenv() # Load environment variables from .env file
import argparse
parser = argparse.ArgumentParser(description="Evaluate LLMs on IOI problems")
parser.add_argument("--org_id", required=True, help="Organization ID")
parser.add_argument("--model_id", required=True, help="Model ID")
parser.add_argument("--api_base", help="API base URL for the model")
parser.add_argument("--subset", default="test", help="IOI subset to generate solutions for (train or test)")
parser.add_argument("--num_generations", type=int, default=50, help="Number of generations per problem")
parser.add_argument("--num_retries", type=int, default=10, help="Number of retries for failed API calls")
parser.add_argument("--concurrency", type=int, default=20, help="Number of concurrent generations")
parser.add_argument("--num_problems", type=int, default=None, help="Number of problems to evaluate (None for all)")
parser.add_argument("--last_subtask", action="store_true", help="Only evaluate the last subtask for each problem (usually the full problem)")
parser.add_argument("--dry_run", action="store_true", help="Run without making actual LLM calls")
parser.add_argument("--override", action="store_true", help="Override existing results and start fresh")
parser.add_argument("--model_postfix", help="Postfix for the model name")
parser.add_argument("--revision", help="Revision to use for the model")
parser.add_argument("--timeout", type=int, default=600, help="Timeout for the LLM call")
parser.add_argument("--use_requests", action="store_true", default=False, help="Use requests instead of litellm")
parser.add_argument("--max_tokens", type=int, default=None, help="Max tokens")
args = parser.parse_args()
evaluator = IOIEvaluator(
org_id=args.org_id,
model_id=args.model_id,
api_base=args.api_base,
subset=args.subset,
num_generations=args.num_generations,
num_retries=args.num_retries,
concurrency=args.concurrency,
num_problems=args.num_problems,
last_subtask=args.last_subtask,
dry_run=args.dry_run,
override=args.override,
model_postfix=args.model_postfix,
revision=args.revision,
timeout=args.timeout,
use_requests=args.use_requests,
max_tokens=args.max_tokens
)
asyncio.run(evaluator.run_evaluation())
if __name__ == "__main__":
main()