extras/rule_violation_detector/analyze.py (232 lines of code) (raw):

import asyncio import json import logging from pathlib import Path from typing import Optional import httpx import openai import pandas as pd import tenacity from openai import AsyncOpenAI as OpenAI from prompts import AFFIRMATIVE_FLAG, ALL_QUESTIONS, NEGATIVE_FLAG, PREAMBLE_CODE, PREAMBLE_LOGS from tqdm import tqdm from mlebench.utils import get_logger, get_timestamp, purple, read_jsonl logger = get_logger(__name__) # Create client on request to avoid requiring API key when this module is just imported _openai_client = None def get_openai_client(): global _openai_client if _openai_client is None: _openai_client = OpenAI( http_client=httpx.AsyncClient( limits=httpx.Limits(max_connections=1000, max_keepalive_connections=100) ) ) # dont want info logs from httpx (this is called by openai on every request) httpx_logger: logging.Logger = logging.getLogger("httpx") httpx_logger.setLevel(logging.WARNING) return _openai_client async def run_analysis( submission: Path, questions: list[str], output_dir: Path, extra_prompt: Optional[Path] = None, ) -> None: """Analyzes a run of MLE-bench and saves the results to a CSV file""" submissions = read_jsonl(submission.as_posix()) if extra_prompt is not None: with open(extra_prompt, "r") as file: extra_prompt = file.read() analysis_df = await analyze_submissions(submissions, questions, extra_prompt) timestamp = get_timestamp() save_path = output_dir / f"analysis_{timestamp}.csv" output_dir.mkdir(parents=True, exist_ok=True) analysis_df.to_csv(save_path, index=False, na_rep="") # None values saved as empty strings logger.info(purple(f"Saved analysis to {save_path}")) # Add summary of binarized questions; log and save binary_summary = get_binary_column_means(analysis_df) summary_dict = { question.replace("_binarized", ""): f"{percentage:.1f}%" for question, percentage in binary_summary.items() } summary_json = json.dumps(summary_dict, indent=2) logger.info(f"Summary of binarized questions (percentage True):\n{summary_json}") summary_save_path = output_dir / f"analysis_summary_{timestamp}.json" with open(summary_save_path, "w") as file: file.write(summary_json) async def analyze_submissions( submissions: list[dict], question_ids: list[str], extra_prompt: Optional[str] = None, max_chunk_size: int = 128_000, ): """ Analyzes logs and code from each submission across a set of questions. """ analysis_rows = [] logger.info(f"Analyzing {len(submissions)} submissions") tasks = [ analyze_single_submission(submission, question_ids, extra_prompt, max_chunk_size) for submission in submissions ] for f in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Analyzing submissions"): row = await f analysis_rows.append(row) analysis_df = pd.DataFrame(analysis_rows) return analysis_df async def analyze_single_submission( submission: dict, question_ids: list[str], extra_prompt: Optional[str], max_chunk_size: int, ) -> dict: assert "competition_id" in submission and isinstance(submission["competition_id"], str) competition_id = submission["competition_id"] logger.info(f"--- Analyzing submission for {competition_id} ---") row = { "competition_id": competition_id, "submission_path": submission["submission_path"], } logs_content = "" code_content = "" def _read_content(path: Path) -> str: """Read logs or code content from a file or directory""" content = "" if path.is_dir(): logger.warning(f"Path is a directory: {path}. Concatenating all relevant files.") for file in path.glob("**/*"): if file.suffix in [".log", ".py"]: with open(file, "r") as f: content += f"--- {file.name} ---\n" content += f.read() + "\n" else: with open(path, "r") as file: content = file.read() return content # Handle logs if "logs_path" in submission and submission["logs_path"]: logs_path = Path(submission["logs_path"]) if logs_path.exists(): logs_content = _read_content(logs_path) logger.info(f"Logs word count: {len(logs_content.split())}") else: logger.warning(f"Logs path not found: {logs_path}") else: logger.warning("Logs path not provided in submission") # Handle code if "code_path" in submission and submission["code_path"]: code_path = Path(submission["code_path"]) if code_path.exists(): code_content = _read_content(code_path) logger.info(f"Code word count: {len(code_content.split())}") else: logger.warning(f"Code path not found: {code_path}") else: logger.warning("Code path not provided in submission") tasks = [] for question_id in question_ids: question_meta = ALL_QUESTIONS[question_id] content_type = question_meta["type"] if content_type == "logs" and not logs_content: logger.warning(f"Skipping question {question_id} due to missing logs") continue elif content_type == "code" and not code_content: logger.warning(f"Skipping question {question_id} due to missing code") continue content = logs_content if content_type == "logs" else code_content question = question_meta["question"] logger.info(f"Analyzing {content_type} for question: {question_id}") tasks.append( process_question( content, content_type, question, question_id, extra_prompt, max_chunk_size, ) ) results = await asyncio.gather(*tasks) for result in results: row.update(result) return row async def process_question( content: str, content_type: str, question: str, question_id: str, extra_prompt: Optional[str], max_chunk_size: int, ) -> dict: try: if len(content) <= max_chunk_size: answer = ( await get_answer_from_analyzer(content, content_type, question, extra_prompt) ).strip() logger.debug(f"Answer: {answer}") result = { f"{content_type}_{question_id}": answer, f"{content_type}_{question_id}_binarized": None, } if answer.strip().endswith(AFFIRMATIVE_FLAG): result[f"{content_type}_{question_id}_binarized"] = True elif answer.strip().endswith(NEGATIVE_FLAG): result[f"{content_type}_{question_id}_binarized"] = False else: logger.warning(f"Text exceeds model token limit. Applying question in chunks.") # Process chunks in parallel tasks = [] binarized = [] for chunk_start in range(0, len(content), max_chunk_size): content_chunk = content[chunk_start : chunk_start + max_chunk_size] tasks.append( get_answer_from_analyzer(content_chunk, content_type, question, extra_prompt) ) answers = await asyncio.gather(*tasks) answers = [answer.strip() for answer in answers] for answer in answers: logger.debug(f"Answer: {answer}") if answer.strip().endswith(AFFIRMATIVE_FLAG): binarized.append(True) elif answer.strip().endswith(NEGATIVE_FLAG): binarized.append(False) else: binarized.append(None) result = { f"{content_type}_{question_id}": "CHUNKED_ANSWERS:\n\n" + "\n\n---------\n\n".join(answers), f"{content_type}_{question_id}_binarized": ( None if all([el is None for el in binarized]) else any(binarized) ), } return result except openai.BadRequestError as e: error_msg = f"Error analyzing {content_type} for question {question_id}: {e}" logger.error(error_msg) return {f"{content_type}_{question_id}": error_msg} @tenacity.retry( wait=tenacity.wait_exponential( multiplier=1, min=2, max=60 ), # Exponential backoff starting at 2 seconds, max 60 seconds stop=tenacity.stop_after_attempt(100), # Stop after 100 attempts retry=tenacity.retry_if_exception_type( ( # https://platform.openai.com/docs/guides/error-codes/python-library-error-types openai.APIConnectionError, openai.APITimeoutError, openai.InternalServerError, openai.RateLimitError, ) ), before=tenacity.before_log(logger, logging.DEBUG), ) async def openai_chat_completion_with_retries(*args, **kwargs): client = get_openai_client() return await client.chat.completions.create(*args, **kwargs) async def get_answer_from_analyzer( content: str, content_type: str, question: str, extra_prompt: Optional[str] = None ) -> str: """Ask a question to the analyzer LLM""" PREAMBLE = PREAMBLE_LOGS if content_type == "logs" else PREAMBLE_CODE extra_prompt = extra_prompt or "" completion = await openai_chat_completion_with_retries( model="gpt-4o-mini", messages=[ {"role": "system", "content": PREAMBLE + extra_prompt}, {"role": "user", "content": content}, {"role": "system", "content": question}, ], temperature=0.0, ) return completion.choices[0].message.content def get_binary_column_means(dataframe: pd.DataFrame): """ Get mean of all binary columns, counting True as 1 and False/None as 0 """ binary_cols = [col for col in dataframe.columns if col.endswith("_binarized")] report = dataframe[binary_cols].fillna(False).mean() * 100 # convert to percentage return report