run_tests/scoring.py (123 lines of code) (raw):
import asyncio
from dataclasses import asdict, dataclass, field
from typing import Union
from piston_client import PistonClient
from utils import batched, load_ioi_tests
@dataclass
class TestResult:
"""
Represents the result of a single test case execution.
Attributes:
test_name: Name of the test case
score: Score achieved for this test (0.0 to 1.0)
status: Status code of the test result (e.g., 'AC', 'WA', 'TLE')
feedback: Detailed feedback message from the judge or an error message
"""
test_name: str
score: float = 0.0
status: str = 'SKIPPED'
feedback: str = None
@dataclass
class SubtaskResult:
"""
Represents the result of a subtask containing multiple test cases.
Attributes:
problem: Problem identifier
subtask: Subtask identifier
points: Maximum points available for this subtask
score_precision: Number of decimal places for score rounding
test_results: List of individual test case results
"""
problem: str = None
subtask: str = None
points: float = 0.0
score_precision: int = 2
test_results: list[TestResult] = field(default_factory=list)
@property
def status(self):
"""
Determines the overall status of the subtask based on the worst status among test results.
Status priorities are ordered from worst to best.
Returns:
str: The status with the highest priority (lowest value)
"""
status_prios = {'CE': -1, 'RE': 0, 'WA': 1, 'MLE': 2, 'TLE': 3, 'PA': 4, 'AC': 5, 'SKIPPED': 999}
return min([x.status for x in self.test_results], key=lambda x: status_prios[x])
@property
def score(self):
"""
Calculates the raw score for the subtask as the minimum score across all test results.
Returns:
float: The rounded minimum score
"""
return 0 if not self.test_results else round(min([test_result.score for test_result in self.test_results]), self.score_precision)
@property
def weighted_score(self):
"""
Calculates the weighted score by multiplying the raw score by the available points.
Returns:
float: The rounded weighted score
"""
return 0 if not self.test_results else round(min([test_result.score for test_result in self.test_results]) * self.points, self.score_precision)
def to_dict(self):
"""
Converts the SubtaskResult to a dictionary representation.
Returns:
dict: Dictionary containing all subtask result data
"""
return {
'problem': self.problem,
'subtask': self.subtask,
'score': self.score,
'weighted_score': self.weighted_score,
'points': self.points,
'score_precision': self.score_precision,
'status': self.status,
'test_results': [asdict(test_result) for test_result in self.test_results]
}
def _extract_single_status(score: float, feedback: str) -> str:
"""
Determines the status code based on the score and feedback message.
Args:
score: The numeric score (0.0 to 1.0)
feedback: The feedback message from the execution
Returns:
str: Status code ('CE', 'MLE', 'TLE', 'WA', 'RE', 'AC', or 'PA')
"""
if score == 0.0:
if "Compilation error" in feedback:
return 'CE'
elif "Memory limit exceeded" in feedback:
return 'MLE'
elif "Time limit exceeded" in feedback:
return 'TLE'
elif "Output isn't correct" in feedback:
return 'WA'
else:
return 'RE'
elif score == 1.0:
return 'AC'
else:
return 'PA'
async def score_single_test_case(client: PistonClient, subtask: dict, test_name: str, test_input: str, test_output: str, submission: str) -> TestResult:
"""
Scores a single test case by running the submission against the provided input and output.
Args:
client: PistonClient instance for executing code
subtask: Dictionary containing subtask configuration
test_name: Name of the test case
test_input: Input data for the test case
test_output: Expected output for the test case
submission: Source code of the submission
Returns:
TestResult: Result of the test case execution
"""
# Run submission for this test case
score, feedback = await run_submission(client, subtask, test_input, submission, test_output)
score = float(score)
return TestResult(test_name=test_name, score=score, status=_extract_single_status(score, feedback), feedback=feedback)
async def score_subtask(client: PistonClient, subtask: dict, submission: str, test_case_run_cache: Union[dict, None] = None, test_batch_size: int = 1) -> SubtaskResult:
"""
Scores all test cases in a subtask.
Args:
client: PistonClient instance for executing code
subtask: Dictionary containing subtask configuration
test_cases: Dictionary mapping test names to (input, output) tuples
submission: Source code of the submission
test_case_run_cache: Optional cache of previously run test cases
test_batch_size: evaluate these many test cases in parallel, then check if any of them failed (0 score): if so stop evaluating; otherwise continue with the next batch of test cases.
-1 to evaluate all test cases in parallel
Returns:
SubtaskResult: Result of the subtask evaluation
"""
subtask_result = SubtaskResult(problem=subtask['id'], subtask=subtask['subtask'], points=subtask['score'], score_precision=subtask['score_precision'], test_results=[])
# tests that are not cached
tests_to_run = [
(ti, test_name)
for ti, test_name in enumerate(subtask['test_names'])
if test_case_run_cache is None or test_name not in test_case_run_cache
]
# initialize test results with cached results or empty (SKIPPED) TestResult objects
subtask_result.test_results = [
test_case_run_cache[test_name] if test_case_run_cache is not None and test_name in test_case_run_cache else
TestResult(test_name=test_name)
for test_name in subtask['test_names']
]
# we skip submissions where no code was extracted
# no need to do anything, as we have a failed cached result
if not submission or any(test_result.status != 'SKIPPED' and test_result.score == 0.0 for test_result in subtask_result.test_results):
return subtask_result
if "test_cases" in subtask:
test_cases = subtask["test_cases"]
if isinstance(subtask["test_cases"], list):
test_cases = {
test_name: test for test_name, test in zip(subtask["test_names"], subtask["test_cases"])
}
else:
test_cases = load_ioi_tests(subtask["year"], subtask["id"])
# run one batch, check if any of them failed (0 score): if so stop evaluating; otherwise continue with the next batch of test cases.
for test_batch_to_run in batched(tests_to_run, test_batch_size):
results = await asyncio.gather(*[
asyncio.create_task(score_single_test_case(client, subtask, test_name, test_cases[test_name][0], test_cases[test_name][1], submission))
for _, test_name in test_batch_to_run
])
for (ti, test_name), test_result in zip(test_batch_to_run, results):
if test_case_run_cache is not None:
test_case_run_cache[test_name] = test_result
subtask_result.test_results[ti] = test_result
# Stop early if it failed
if any(test_result.score == 0.0 for test_result in results):
break
return subtask_result
async def score_subtasks(client: PistonClient, subtasks: list[dict], submission: str, test_batch_size: int = 1) -> list[SubtaskResult]:
"""
Scores multiple subtasks for a submission.
Args:
client: PistonClient instance for executing code
subtasks: List of dictionaries containing subtask configurations
submission: Source code of the submission
test_batch_size: evaluate these many test cases in parallel, then check if any of them failed (0 score): if so stop evaluating; otherwise continue with the next batch of test cases.
-1 to evaluate all test cases in parallel
Returns:
list[SubtaskResult]: Results for all subtasks
"""
# avoid rerunning tests present in multiple subtasks
test_case_run_cache = {}
return [
await score_subtask(client, subtask, submission, test_case_run_cache, test_batch_size)
for subtask in subtasks
]
async def run_submission(client: PistonClient, problem: dict, test_input: str, submission: str, test_output: str | None = None) -> tuple[str, str]:
"""
Executes a submission against a test case using the Piston execution environment.
Args:
client: PistonClient instance for executing code
problem: Dictionary containing problem configuration
test_input: Input data for the test case
submission: Source code of the submission
test_output: Optional expected output for the test case
Returns:
tuple[str, str]: A tuple containing (score, feedback)
"""
data = {
"files": [
# the actual submission
{
"name": f"graders/{problem['id'].lower()}.cpp",
"content": submission
},
# pass the input
{
"name": "input.txt",
"content": test_input
},
# pass the expected output
*([{
"name": "correct_output.txt",
"content": test_output
}] if test_output else []),
# grader files
*({
"name": name,
"content": content
} for name, content in problem['grader_files'] if content)
],
'run_timeout': round((problem['time_limit'] + 3) * 1000), # +3 seconds hard limit. time limits are handled by the ioi script
'run_memory_limit': problem['memory_limit']
}
return await client.execute(data)